Remove DefaultChannelHandlerContext.needsLazyBufInit which is used only by the head handler
This commit is contained in:
parent
f1ecb4ab1a
commit
d55567e21b
@ -45,7 +45,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final ChannelHandler handler;
|
private final ChannelHandler handler;
|
||||||
private boolean needsLazyBufInit;
|
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
// child executor.
|
// child executor.
|
||||||
@ -182,7 +181,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
executor = null;
|
executor = null;
|
||||||
inByteBuf = null;
|
inByteBuf = null;
|
||||||
inMsgBuf = null;
|
inMsgBuf = null;
|
||||||
needsLazyBufInit = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, TailHandler handler) {
|
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name, TailHandler handler) {
|
||||||
@ -302,30 +300,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void lazyInitHeadHandler() {
|
void initHeadHandler() {
|
||||||
if (needsLazyBufInit) {
|
// Must be called for the head handler.
|
||||||
EventExecutor exec = executor();
|
HeadHandler h = (HeadHandler) handler;
|
||||||
if (exec.inEventLoop()) {
|
if (h.initialized) {
|
||||||
if (needsLazyBufInit) {
|
return;
|
||||||
needsLazyBufInit = false;
|
|
||||||
HeadHandler headHandler = (HeadHandler) handler;
|
|
||||||
headHandler.init(this);
|
|
||||||
outByteBuf = headHandler.byteSink;
|
|
||||||
outMsgBuf = headHandler.msgSink;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
getFromFuture(exec.submit(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
lazyInitHeadHandler();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new ChannelPipelineException("failed to initialize an outbound buffer lazily", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert executor().inEventLoop();
|
||||||
|
|
||||||
|
h.init(this);
|
||||||
|
h.initialized = true;
|
||||||
|
outByteBuf = h.byteSink;
|
||||||
|
outMsgBuf = h.msgSink;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillInboundBridge() {
|
private void fillInboundBridge() {
|
||||||
@ -563,41 +550,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
waitForFuture(executor().submit(r));
|
waitForFuture(executor().submit(r));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Waits for a future to finish and gets the result. If the task is interrupted, then the current thread
|
|
||||||
* will be interrupted and this will return {@code null}. It is expected that the task performs any
|
|
||||||
* appropriate locking.
|
|
||||||
* <p>
|
|
||||||
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error},
|
|
||||||
* {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an {@link AssertionError}
|
|
||||||
* and that is thrown instead.</p>
|
|
||||||
*
|
|
||||||
* @param future wait for this future
|
|
||||||
* @param <T> the return value type
|
|
||||||
* @return the task's return value, or {@code null} if the task was interrupted.
|
|
||||||
* @see Future#get()
|
|
||||||
* @throws Error if the task threw this.
|
|
||||||
* @throws RuntimeException if the task threw this.
|
|
||||||
* @throws Exception if the task threw this.
|
|
||||||
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
|
||||||
* {@link Throwable}.
|
|
||||||
*/
|
|
||||||
private static <T> T getFromFuture(Future<T> future) throws Exception {
|
|
||||||
try {
|
|
||||||
return future.get();
|
|
||||||
} catch (ExecutionException ex) {
|
|
||||||
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
|
||||||
|
|
||||||
PlatformDependent.throwException(ex.getCause());
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
// Interrupt the calling thread (note that this method is not called from the event loop)
|
|
||||||
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
||||||
* It is expected that the task performs any appropriate locking.
|
* It is expected that the task performs any appropriate locking.
|
||||||
@ -740,7 +692,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelRegistered() {
|
public ChannelHandlerContext fireChannelRegistered() {
|
||||||
lazyInitHeadHandler();
|
|
||||||
final DefaultChannelHandlerContext next = findContextInbound();
|
final DefaultChannelHandlerContext next = findContextInbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
@ -793,7 +744,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelActive() {
|
public ChannelHandlerContext fireChannelActive() {
|
||||||
lazyInitHeadHandler();
|
|
||||||
final DefaultChannelHandlerContext next = findContextInbound();
|
final DefaultChannelHandlerContext next = findContextInbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
|
@ -908,6 +908,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelRegistered() {
|
public ChannelPipeline fireChannelRegistered() {
|
||||||
|
head.initHeadHandler();
|
||||||
head.fireChannelRegistered();
|
head.fireChannelRegistered();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
@ -926,6 +927,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
@Override
|
@Override
|
||||||
public ChannelPipeline fireChannelActive() {
|
public ChannelPipeline fireChannelActive() {
|
||||||
firedChannelActive = true;
|
firedChannelActive = true;
|
||||||
|
head.initHeadHandler();
|
||||||
head.fireChannelActive();
|
head.fireChannelActive();
|
||||||
|
|
||||||
if (channel.config().isAutoRead()) {
|
if (channel.config().isAutoRead()) {
|
||||||
@ -1200,12 +1202,14 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
protected final Unsafe unsafe;
|
protected final Unsafe unsafe;
|
||||||
ByteBuf byteSink;
|
ByteBuf byteSink;
|
||||||
MessageBuf<Object> msgSink;
|
MessageBuf<Object> msgSink;
|
||||||
|
boolean initialized;
|
||||||
|
|
||||||
protected HeadHandler(Unsafe unsafe) {
|
protected HeadHandler(Unsafe unsafe) {
|
||||||
this.unsafe = unsafe;
|
this.unsafe = unsafe;
|
||||||
}
|
}
|
||||||
|
|
||||||
void init(ChannelHandlerContext ctx) {
|
void init(ChannelHandlerContext ctx) {
|
||||||
|
assert !initialized;
|
||||||
switch (ctx.channel().metadata().bufferType()) {
|
switch (ctx.channel().metadata().bufferType()) {
|
||||||
case BYTE:
|
case BYTE:
|
||||||
byteSink = ctx.alloc().ioBuffer();
|
byteSink = ctx.alloc().ioBuffer();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user