Little bit of optimization
This commit is contained in:
parent
e2a617b07b
commit
4440386494
@ -72,7 +72,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
private final Channel parent;
|
||||
private final Integer id;
|
||||
private final Unsafe unsafe;
|
||||
private final ChannelPipeline pipeline = new DefaultChannelPipeline(this);
|
||||
private final ChannelPipeline pipeline;
|
||||
private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
|
||||
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
|
||||
private final CloseFuture closeFuture = new CloseFuture(this);
|
||||
@ -131,6 +131,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
allChannels.remove(id());
|
||||
}
|
||||
});
|
||||
|
||||
pipeline = new DefaultChannelPipeline(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -150,6 +152,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
|
||||
@Override
|
||||
public EventLoop eventLoop() {
|
||||
EventLoop eventLoop = this.eventLoop;
|
||||
if (eventLoop == null) {
|
||||
throw new IllegalStateException("channel not registered to an event loop");
|
||||
}
|
||||
@ -581,13 +584,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt/perform outbound I/O if:
|
||||
// - the channel is inactive - flush0() will fail the futures.
|
||||
// - the event loop has no plan to call flushForcibly().
|
||||
if (!inFlushNow) {
|
||||
if (!inFlushNow) { // Avoid re-entrance
|
||||
try {
|
||||
if (!isActive() || !isFlushPending()) {
|
||||
if (!isFlushPending()) {
|
||||
flushNow();
|
||||
} else {
|
||||
// Event loop will call flushNow() later by itself.
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
notifyFlushFutures(t);
|
||||
@ -647,10 +649,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
close(voidFuture());
|
||||
}
|
||||
}
|
||||
|
||||
if (!isActive()) {
|
||||
close(unsafe().voidFuture());
|
||||
}
|
||||
} finally {
|
||||
inFlushNow = false;
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import java.util.Queue;
|
||||
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelInboundHandlerContext<Object>, ChannelOutboundHandlerContext<Object> {
|
||||
volatile DefaultChannelHandlerContext next;
|
||||
volatile DefaultChannelHandlerContext prev;
|
||||
private final Channel channel;
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
final EventExecutor executor;
|
||||
private final String name;
|
||||
@ -110,6 +111,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
this.prev = prev;
|
||||
this.next = next;
|
||||
|
||||
channel = pipeline.channel;
|
||||
this.pipeline = pipeline;
|
||||
this.name = name;
|
||||
this.handler = handler;
|
||||
@ -153,7 +155,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public Channel channel() {
|
||||
return pipeline.channel;
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -164,7 +166,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
@Override
|
||||
public EventExecutor executor() {
|
||||
if (executor == null) {
|
||||
return channel().eventLoop();
|
||||
return channel.eventLoop();
|
||||
} else {
|
||||
return executor;
|
||||
}
|
||||
@ -360,16 +362,16 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public ChannelFuture newFuture() {
|
||||
return channel().newFuture();
|
||||
return channel.newFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return channel().newSucceededFuture();
|
||||
return channel.newSucceededFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||
return channel().newFailedFuture(cause);
|
||||
return channel.newFailedFuture(cause);
|
||||
}
|
||||
}
|
@ -38,6 +38,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
|
||||
|
||||
final Channel channel;
|
||||
private final Channel.Unsafe unsafe;
|
||||
private final ChannelBufferHolder<Object> directOutbound;
|
||||
|
||||
private volatile DefaultChannelHandlerContext head;
|
||||
private volatile DefaultChannelHandlerContext tail;
|
||||
private final Map<String, DefaultChannelHandlerContext> name2ctx =
|
||||
@ -53,6 +56,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
throw new NullPointerException("channel");
|
||||
}
|
||||
this.channel = channel;
|
||||
unsafe = channel.unsafe();
|
||||
directOutbound = unsafe.directOutbound();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -667,9 +672,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
ChannelBuffer nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
|
||||
for (;;) {
|
||||
if (ctx == null) {
|
||||
ChannelBufferHolder<Object> lastOut = channel().unsafe().directOutbound();
|
||||
if (lastOut.hasByteBuffer()) {
|
||||
return lastOut.byteBuffer();
|
||||
if (directOutbound.hasByteBuffer()) {
|
||||
return directOutbound.byteBuffer();
|
||||
} else {
|
||||
throw NoSuchBufferException.INSTANCE;
|
||||
}
|
||||
@ -686,9 +690,8 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
Queue<Object> nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
|
||||
for (;;) {
|
||||
if (ctx == null) {
|
||||
ChannelBufferHolder<Object> lastOut = channel().unsafe().directOutbound();
|
||||
if (lastOut.hasMessageBuffer()) {
|
||||
return lastOut.messageBuffer();
|
||||
if (directOutbound.hasMessageBuffer()) {
|
||||
return directOutbound.messageBuffer();
|
||||
} else {
|
||||
throw NoSuchBufferException.INSTANCE;
|
||||
}
|
||||
@ -942,7 +945,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().bind(localAddress, future);
|
||||
unsafe.bind(localAddress, future);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
@ -980,7 +983,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().connect(remoteAddress, localAddress, future);
|
||||
unsafe.connect(remoteAddress, localAddress, future);
|
||||
}
|
||||
|
||||
return future;
|
||||
@ -1010,7 +1013,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().disconnect(future);
|
||||
unsafe.disconnect(future);
|
||||
}
|
||||
|
||||
return future;
|
||||
@ -1040,7 +1043,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().close(future);
|
||||
unsafe.close(future);
|
||||
}
|
||||
|
||||
return future;
|
||||
@ -1070,7 +1073,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().deregister(future);
|
||||
unsafe.deregister(future);
|
||||
}
|
||||
|
||||
return future;
|
||||
@ -1086,16 +1089,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
if (ctx != null) {
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
try {
|
||||
((ChannelOutboundHandler<Object>) ctx.handler()).flush(ctx, future);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
ChannelBufferHolder<Object> outbound = ctx.outbound();
|
||||
if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) {
|
||||
outbound.byteBuffer().discardReadBytes();
|
||||
}
|
||||
}
|
||||
flush0(ctx, future);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
@ -1105,12 +1099,25 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
});
|
||||
}
|
||||
} else {
|
||||
channel().unsafe().flush(future);
|
||||
unsafe.flush(future);
|
||||
}
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private void flush0(final DefaultChannelHandlerContext ctx, ChannelFuture future) {
|
||||
try {
|
||||
((ChannelOutboundHandler<Object>) ctx.handler()).flush(ctx, future);
|
||||
} catch (Throwable t) {
|
||||
notifyHandlerException(t);
|
||||
} finally {
|
||||
ChannelBufferHolder<Object> outbound = ctx.outbound();
|
||||
if (!outbound.isBypass() && outbound.isEmpty() && outbound.hasByteBuffer()) {
|
||||
outbound.byteBuffer().discardReadBytes();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object message, ChannelFuture future) {
|
||||
return write(firstOutboundContext(), message, future);
|
||||
@ -1129,7 +1136,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
out = ctx.outbound();
|
||||
} else {
|
||||
executor = channel().eventLoop();
|
||||
out = channel().unsafe().directOutbound();
|
||||
out = directOutbound;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
@ -1143,7 +1150,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
"cannot write a message whose type is not " +
|
||||
ChannelBuffer.class.getSimpleName() + ": " + message.getClass().getName());
|
||||
}
|
||||
return flush(ctx, future);
|
||||
if (ctx != null) {
|
||||
flush0(ctx, future);
|
||||
} else {
|
||||
unsafe.flush(future);
|
||||
}
|
||||
return future;
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user