2012-06-04 22:31:44 +02:00
|
|
|
/*
|
|
|
|
* Copyright 2012 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
2012-06-02 02:51:19 +02:00
|
|
|
package io.netty.channel;
|
|
|
|
|
2012-06-11 16:04:04 +02:00
|
|
|
import static io.netty.channel.DefaultChannelPipeline.*;
|
2012-06-10 04:08:43 +02:00
|
|
|
import io.netty.buffer.ByteBuf;
|
2012-06-10 05:22:32 +02:00
|
|
|
import io.netty.buffer.ChannelBuf;
|
|
|
|
import io.netty.buffer.MessageBuf;
|
2012-06-11 10:02:00 +02:00
|
|
|
import io.netty.buffer.Unpooled;
|
2012-06-02 02:51:19 +02:00
|
|
|
import io.netty.util.DefaultAttributeMap;
|
2012-06-04 09:24:34 +02:00
|
|
|
import io.netty.util.internal.QueueFactory;
|
2012-06-02 02:51:19 +02:00
|
|
|
|
|
|
|
import java.net.SocketAddress;
|
2012-06-07 07:52:33 +02:00
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.EnumSet;
|
|
|
|
import java.util.Set;
|
2012-06-04 03:51:42 +02:00
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
2012-06-02 02:51:19 +02:00
|
|
|
|
2012-06-07 07:52:33 +02:00
|
|
|
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
|
|
|
|
|
|
|
private static final EnumSet<ChannelHandlerType> EMPTY_TYPE = EnumSet.noneOf(ChannelHandlerType.class);
|
|
|
|
|
2012-06-09 02:44:30 +02:00
|
|
|
static final int DIR_INBOUND = 0x00000001;
|
|
|
|
static final int DIR_OUTBOUND = 0x80000000;
|
|
|
|
|
2012-06-02 02:51:19 +02:00
|
|
|
volatile DefaultChannelHandlerContext next;
|
|
|
|
volatile DefaultChannelHandlerContext prev;
|
2012-06-02 03:34:19 +02:00
|
|
|
private final Channel channel;
|
2012-06-02 02:51:19 +02:00
|
|
|
private final DefaultChannelPipeline pipeline;
|
2012-06-04 09:24:34 +02:00
|
|
|
EventExecutor executor; // not thread-safe but OK because it never changes once set.
|
2012-06-02 02:51:19 +02:00
|
|
|
private final String name;
|
2012-06-09 02:44:30 +02:00
|
|
|
private final Set<ChannelHandlerType> type;
|
|
|
|
final int directions;
|
2012-06-02 02:51:19 +02:00
|
|
|
private final ChannelHandler handler;
|
2012-06-07 07:52:33 +02:00
|
|
|
|
2012-06-11 03:43:47 +02:00
|
|
|
final MessageBuf<Object> inMsgBuf;
|
2012-06-10 04:08:43 +02:00
|
|
|
final ByteBuf inByteBuf;
|
2012-06-11 03:43:47 +02:00
|
|
|
final MessageBuf<Object> outMsgBuf;
|
2012-06-10 04:08:43 +02:00
|
|
|
final ByteBuf outByteBuf;
|
2012-06-04 03:51:42 +02:00
|
|
|
|
|
|
|
// When the two handlers run in a different thread and they are next to each other,
|
2012-06-04 09:24:34 +02:00
|
|
|
// each other's buffers can be accessed at the same time resulting in a race condition.
|
2012-06-04 03:51:42 +02:00
|
|
|
// To avoid such situation, we lazily creates an additional thread-safe buffer called
|
|
|
|
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
|
|
|
|
// The content written into a bridge is flushed into the actual buffer by flushBridge().
|
2012-06-04 09:24:34 +02:00
|
|
|
final AtomicReference<MessageBridge> inMsgBridge;
|
|
|
|
final AtomicReference<MessageBridge> outMsgBridge;
|
2012-06-09 14:05:59 +02:00
|
|
|
final AtomicReference<ByteBridge> inByteBridge;
|
|
|
|
final AtomicReference<ByteBridge> outByteBridge;
|
2012-06-02 02:51:19 +02:00
|
|
|
|
|
|
|
// Runnables that calls handlers
|
|
|
|
final Runnable fireChannelRegisteredTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 07:52:33 +02:00
|
|
|
((ChannelStateHandler) ctx.handler).channelRegistered(ctx);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
final Runnable fireChannelUnregisteredTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 07:52:33 +02:00
|
|
|
((ChannelStateHandler) ctx.handler).channelUnregistered(ctx);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
final Runnable fireChannelActiveTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 07:52:33 +02:00
|
|
|
((ChannelStateHandler) ctx.handler).channelActive(ctx);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
final Runnable fireChannelInactiveTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 07:52:33 +02:00
|
|
|
((ChannelStateHandler) ctx.handler).channelInactive(ctx);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-06-04 09:24:34 +02:00
|
|
|
final Runnable curCtxFireInboundBufferUpdatedTask = new Runnable() {
|
2012-06-02 02:51:19 +02:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
2012-06-04 03:51:42 +02:00
|
|
|
flushBridge();
|
2012-06-02 02:51:19 +02:00
|
|
|
try {
|
2012-06-07 09:56:21 +02:00
|
|
|
((ChannelStateHandler) ctx.handler).inboundBufferUpdated(ctx);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
} finally {
|
2012-06-10 04:08:43 +02:00
|
|
|
ByteBuf buf = inByteBuf;
|
2012-06-07 07:52:33 +02:00
|
|
|
if (buf != null) {
|
2012-06-04 20:56:00 +02:00
|
|
|
if (!buf.readable()) {
|
|
|
|
buf.discardReadBytes();
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-06-04 09:24:34 +02:00
|
|
|
private final Runnable nextCtxFireInboundBufferUpdatedTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
2012-06-07 07:52:33 +02:00
|
|
|
DefaultChannelHandlerContext next = nextContext(
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext.this.next, DIR_INBOUND);
|
2012-06-04 09:24:34 +02:00
|
|
|
if (next != null) {
|
|
|
|
next.fillBridge();
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.curCtxFireInboundBufferUpdatedTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.curCtxFireInboundBufferUpdatedTask);
|
|
|
|
}
|
2012-06-04 09:24:34 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-06-02 02:51:19 +02:00
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
DefaultChannelHandlerContext(
|
|
|
|
DefaultChannelPipeline pipeline, EventExecutor executor,
|
|
|
|
DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
|
|
|
|
String name, ChannelHandler handler) {
|
|
|
|
|
|
|
|
if (name == null) {
|
|
|
|
throw new NullPointerException("name");
|
|
|
|
}
|
|
|
|
if (handler == null) {
|
|
|
|
throw new NullPointerException("handler");
|
|
|
|
}
|
|
|
|
|
2012-06-07 07:52:33 +02:00
|
|
|
// Determine the type of the specified handler.
|
2012-06-09 02:44:30 +02:00
|
|
|
int typeValue = 0;
|
2012-06-07 07:52:33 +02:00
|
|
|
EnumSet<ChannelHandlerType> type = EMPTY_TYPE.clone();
|
|
|
|
if (handler instanceof ChannelStateHandler) {
|
|
|
|
type.add(ChannelHandlerType.STATE);
|
2012-06-09 02:44:30 +02:00
|
|
|
typeValue |= DIR_INBOUND;
|
|
|
|
if (handler instanceof ChannelInboundHandler) {
|
|
|
|
type.add(ChannelHandlerType.INBOUND);
|
|
|
|
}
|
2012-06-07 07:52:33 +02:00
|
|
|
}
|
|
|
|
if (handler instanceof ChannelOperationHandler) {
|
|
|
|
type.add(ChannelHandlerType.OPERATION);
|
2012-06-09 02:44:30 +02:00
|
|
|
typeValue |= DIR_OUTBOUND;
|
|
|
|
if (handler instanceof ChannelOutboundHandler) {
|
|
|
|
type.add(ChannelHandlerType.OUTBOUND);
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
2012-06-07 07:52:33 +02:00
|
|
|
this.type = Collections.unmodifiableSet(type);
|
2012-06-10 05:22:32 +02:00
|
|
|
directions = typeValue;
|
2012-06-02 02:51:19 +02:00
|
|
|
|
|
|
|
this.prev = prev;
|
|
|
|
this.next = next;
|
|
|
|
|
2012-06-02 03:34:19 +02:00
|
|
|
channel = pipeline.channel;
|
2012-06-02 02:51:19 +02:00
|
|
|
this.pipeline = pipeline;
|
|
|
|
this.name = name;
|
|
|
|
this.handler = handler;
|
|
|
|
|
|
|
|
if (executor != null) {
|
|
|
|
// Pin one of the child executors once and remember it so that the same child executor
|
|
|
|
// is used to fire events for the same channel.
|
|
|
|
EventExecutor childExecutor = pipeline.childExecutors.get(executor);
|
|
|
|
if (childExecutor == null) {
|
|
|
|
childExecutor = executor.unsafe().nextChild();
|
|
|
|
pipeline.childExecutors.put(executor, childExecutor);
|
|
|
|
}
|
|
|
|
this.executor = childExecutor;
|
2012-06-04 20:59:31 +02:00
|
|
|
} else if (channel.isRegistered()) {
|
|
|
|
this.executor = channel.eventLoop();
|
2012-06-02 02:51:19 +02:00
|
|
|
} else {
|
|
|
|
this.executor = null;
|
|
|
|
}
|
|
|
|
|
2012-06-07 07:52:33 +02:00
|
|
|
if (type.contains(ChannelHandlerType.INBOUND)) {
|
2012-06-10 05:22:32 +02:00
|
|
|
ChannelBuf buf;
|
2012-06-02 02:51:19 +02:00
|
|
|
try {
|
2012-06-10 05:22:32 +02:00
|
|
|
buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Exception e) {
|
|
|
|
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
|
|
|
|
}
|
2012-06-04 03:51:42 +02:00
|
|
|
|
2012-06-10 05:22:32 +02:00
|
|
|
if (buf == null) {
|
|
|
|
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (buf instanceof ByteBuf) {
|
|
|
|
inByteBuf = (ByteBuf) buf;
|
2012-06-09 14:05:59 +02:00
|
|
|
inByteBridge = new AtomicReference<ByteBridge>();
|
2012-06-07 07:52:33 +02:00
|
|
|
inMsgBuf = null;
|
|
|
|
inMsgBridge = null;
|
2012-06-10 05:22:32 +02:00
|
|
|
} else if (buf instanceof MessageBuf) {
|
2012-06-07 07:52:33 +02:00
|
|
|
inByteBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
inByteBridge = null;
|
2012-06-11 03:43:47 +02:00
|
|
|
inMsgBuf = (MessageBuf<Object>) buf;
|
2012-06-07 07:52:33 +02:00
|
|
|
inMsgBridge = new AtomicReference<MessageBridge>();
|
2012-06-10 05:22:32 +02:00
|
|
|
} else {
|
2012-06-09 02:44:30 +02:00
|
|
|
throw new Error();
|
2012-06-04 03:51:42 +02:00
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
} else {
|
2012-06-07 07:52:33 +02:00
|
|
|
inByteBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
inByteBridge = null;
|
2012-06-07 07:52:33 +02:00
|
|
|
inMsgBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
inMsgBridge = null;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
2012-06-07 07:52:33 +02:00
|
|
|
|
|
|
|
if (type.contains(ChannelHandlerType.OUTBOUND)) {
|
2012-06-10 05:22:32 +02:00
|
|
|
ChannelBuf buf;
|
2012-06-02 02:51:19 +02:00
|
|
|
try {
|
2012-06-10 05:22:32 +02:00
|
|
|
buf = ((ChannelOutboundHandler) handler).newOutboundBuffer(this);
|
2012-06-02 02:51:19 +02:00
|
|
|
} catch (Exception e) {
|
|
|
|
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
|
|
|
|
}
|
2012-06-04 03:51:42 +02:00
|
|
|
|
2012-06-10 05:22:32 +02:00
|
|
|
if (buf == null) {
|
|
|
|
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (buf instanceof ByteBuf) {
|
|
|
|
outByteBuf = (ByteBuf) buf;
|
2012-06-09 14:05:59 +02:00
|
|
|
outByteBridge = new AtomicReference<ByteBridge>();
|
2012-06-07 07:52:33 +02:00
|
|
|
outMsgBuf = null;
|
|
|
|
outMsgBridge = null;
|
2012-06-10 05:22:32 +02:00
|
|
|
} else if (buf instanceof MessageBuf) {
|
2012-06-07 07:52:33 +02:00
|
|
|
outByteBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
outByteBridge = null;
|
2012-06-11 03:43:47 +02:00
|
|
|
outMsgBuf = (MessageBuf<Object>) buf;
|
2012-06-07 07:52:33 +02:00
|
|
|
outMsgBridge = new AtomicReference<MessageBridge>();
|
2012-06-10 05:22:32 +02:00
|
|
|
} else {
|
2012-06-09 02:44:30 +02:00
|
|
|
throw new Error();
|
2012-06-04 03:51:42 +02:00
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
} else {
|
2012-06-07 07:52:33 +02:00
|
|
|
outByteBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
outByteBridge = null;
|
2012-06-07 07:52:33 +02:00
|
|
|
outMsgBuf = null;
|
2012-06-04 03:51:42 +02:00
|
|
|
outMsgBridge = null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-04 09:24:34 +02:00
|
|
|
void fillBridge() {
|
|
|
|
if (inMsgBridge != null) {
|
|
|
|
MessageBridge bridge = inMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
} else if (inByteBridge != null) {
|
2012-06-09 14:05:59 +02:00
|
|
|
ByteBridge bridge = inByteBridge.get();
|
2012-06-04 09:24:34 +02:00
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (outMsgBridge != null) {
|
|
|
|
MessageBridge bridge = outMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
} else if (outByteBridge != null) {
|
2012-06-09 14:05:59 +02:00
|
|
|
ByteBridge bridge = outByteBridge.get();
|
2012-06-04 09:24:34 +02:00
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-04 03:51:42 +02:00
|
|
|
void flushBridge() {
|
|
|
|
if (inMsgBridge != null) {
|
2012-06-04 09:24:34 +02:00
|
|
|
MessageBridge bridge = inMsgBridge.get();
|
2012-06-04 03:51:42 +02:00
|
|
|
if (bridge != null) {
|
2012-06-07 07:52:33 +02:00
|
|
|
bridge.flush(inMsgBuf);
|
2012-06-04 09:24:34 +02:00
|
|
|
}
|
|
|
|
} else if (inByteBridge != null) {
|
2012-06-09 14:05:59 +02:00
|
|
|
ByteBridge bridge = inByteBridge.get();
|
2012-06-04 09:24:34 +02:00
|
|
|
if (bridge != null) {
|
2012-06-07 07:52:33 +02:00
|
|
|
bridge.flush(inByteBuf);
|
2012-06-04 03:51:42 +02:00
|
|
|
}
|
|
|
|
}
|
2012-06-04 09:24:34 +02:00
|
|
|
|
2012-06-04 03:51:42 +02:00
|
|
|
if (outMsgBridge != null) {
|
2012-06-04 09:24:34 +02:00
|
|
|
MessageBridge bridge = outMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
2012-06-07 07:52:33 +02:00
|
|
|
bridge.flush(outMsgBuf);
|
2012-06-04 09:24:34 +02:00
|
|
|
}
|
|
|
|
} else if (outByteBridge != null) {
|
2012-06-09 14:05:59 +02:00
|
|
|
ByteBridge bridge = outByteBridge.get();
|
2012-06-04 03:51:42 +02:00
|
|
|
if (bridge != null) {
|
2012-06-07 07:52:33 +02:00
|
|
|
bridge.flush(outByteBuf);
|
2012-06-04 03:51:42 +02:00
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Channel channel() {
|
2012-06-02 03:34:19 +02:00
|
|
|
return channel;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelPipeline pipeline() {
|
|
|
|
return pipeline;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public EventExecutor executor() {
|
|
|
|
if (executor == null) {
|
2012-06-04 09:24:34 +02:00
|
|
|
return executor = channel.eventLoop();
|
2012-06-02 02:51:19 +02:00
|
|
|
} else {
|
|
|
|
return executor;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelHandler handler() {
|
|
|
|
return handler;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String name() {
|
|
|
|
return name;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-07 07:52:33 +02:00
|
|
|
public Set<ChannelHandlerType> type() {
|
|
|
|
return type;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasInboundByteBuffer() {
|
|
|
|
return inByteBuf != null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasInboundMessageBuffer() {
|
|
|
|
return inMsgBuf != null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 04:08:43 +02:00
|
|
|
public ByteBuf inboundByteBuffer() {
|
2012-06-07 07:52:33 +02:00
|
|
|
if (inByteBuf == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
|
|
|
return inByteBuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@SuppressWarnings("unchecked")
|
2012-06-11 03:43:47 +02:00
|
|
|
public <T> MessageBuf<T> inboundMessageBuffer() {
|
2012-06-07 07:52:33 +02:00
|
|
|
if (inMsgBuf == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
2012-06-11 03:43:47 +02:00
|
|
|
return (MessageBuf<T>) inMsgBuf;
|
2012-06-07 07:52:33 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasOutboundByteBuffer() {
|
|
|
|
return outByteBuf != null;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-07 07:52:33 +02:00
|
|
|
public boolean hasOutboundMessageBuffer() {
|
|
|
|
return outMsgBuf != null;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 04:08:43 +02:00
|
|
|
public ByteBuf outboundByteBuffer() {
|
2012-06-07 07:52:33 +02:00
|
|
|
if (outByteBuf == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
|
|
|
return outByteBuf;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-07 07:52:33 +02:00
|
|
|
@SuppressWarnings("unchecked")
|
2012-06-11 03:43:47 +02:00
|
|
|
public <T> MessageBuf<T> outboundMessageBuffer() {
|
2012-06-07 07:52:33 +02:00
|
|
|
if (outMsgBuf == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
2012-06-11 03:43:47 +02:00
|
|
|
return (MessageBuf<T>) outMsgBuf;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
2012-06-03 13:35:38 +02:00
|
|
|
@Override
|
|
|
|
public boolean hasNextInboundByteBuffer() {
|
2012-06-08 16:11:15 +02:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (ctx.inByteBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-03 13:35:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextInboundMessageBuffer() {
|
2012-06-08 16:11:15 +02:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (ctx.inMsgBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-03 13:35:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextOutboundByteBuffer() {
|
|
|
|
return pipeline.hasNextOutboundByteBuffer(prev);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextOutboundMessageBuffer() {
|
|
|
|
return pipeline.hasNextOutboundMessageBuffer(prev);
|
|
|
|
}
|
|
|
|
|
2012-06-02 02:51:19 +02:00
|
|
|
@Override
|
2012-06-10 04:08:43 +02:00
|
|
|
public ByteBuf nextInboundByteBuffer() {
|
2012-06-08 16:11:15 +02:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
final Thread currentThread = Thread.currentThread();
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
|
|
|
if (ctx.inByteBuf != null) {
|
|
|
|
if (ctx.executor().inEventLoop(currentThread)) {
|
|
|
|
return ctx.inByteBuf;
|
|
|
|
} else {
|
2012-06-09 14:05:59 +02:00
|
|
|
ByteBridge bridge = ctx.inByteBridge.get();
|
2012-06-08 16:11:15 +02:00
|
|
|
if (bridge == null) {
|
2012-06-09 14:05:59 +02:00
|
|
|
bridge = new ByteBridge();
|
2012-06-08 16:11:15 +02:00
|
|
|
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
|
|
|
|
bridge = ctx.inByteBridge.get();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return bridge.byteBuf;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-11 03:43:47 +02:00
|
|
|
public MessageBuf<Object> nextInboundMessageBuffer() {
|
2012-06-08 16:11:15 +02:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
final Thread currentThread = Thread.currentThread();
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
throw new NoSuchBufferException();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ctx.inMsgBuf != null) {
|
|
|
|
if (ctx.executor().inEventLoop(currentThread)) {
|
|
|
|
return ctx.inMsgBuf;
|
|
|
|
} else {
|
|
|
|
MessageBridge bridge = ctx.inMsgBridge.get();
|
|
|
|
if (bridge == null) {
|
|
|
|
bridge = new MessageBridge();
|
|
|
|
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
|
|
|
|
bridge = ctx.inMsgBridge.get();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return bridge.msgBuf;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 04:08:43 +02:00
|
|
|
public ByteBuf nextOutboundByteBuffer() {
|
2012-06-07 12:39:37 +02:00
|
|
|
return pipeline.nextOutboundByteBuffer(prev);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-11 03:43:47 +02:00
|
|
|
public MessageBuf<Object> nextOutboundMessageBuffer() {
|
2012-06-07 12:39:37 +02:00
|
|
|
return pipeline.nextOutboundMessageBuffer(prev);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelRegistered() {
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelRegisteredTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelRegisteredTask);
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelUnregistered() {
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelUnregisteredTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelUnregisteredTask);
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelActive() {
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelActiveTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelActiveTask);
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelInactive() {
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelInactiveTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelInactiveTask);
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-08 16:11:15 +02:00
|
|
|
public void fireExceptionCaught(final Throwable cause) {
|
|
|
|
if (cause == null) {
|
|
|
|
throw new NullPointerException("cause");
|
|
|
|
}
|
|
|
|
|
2012-06-07 07:52:33 +02:00
|
|
|
DefaultChannelHandlerContext next = this.next;
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
try {
|
|
|
|
next.handler().exceptionCaught(next, cause);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
if (logger.isWarnEnabled()) {
|
|
|
|
logger.warn(
|
|
|
|
"An exception was thrown by a user handler's " +
|
|
|
|
"exceptionCaught() method while handling the following exception:", cause);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
2012-06-12 10:25:27 +02:00
|
|
|
try {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
fireExceptionCaught(cause);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Throwable t) {
|
|
|
|
if (logger.isWarnEnabled()) {
|
|
|
|
logger.warn("Failed to submit an exceptionCaught() event.", t);
|
|
|
|
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
|
2012-06-08 16:11:15 +02:00
|
|
|
}
|
2012-06-12 10:25:27 +02:00
|
|
|
}
|
2012-06-08 16:11:15 +02:00
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
} else {
|
2012-06-08 16:11:15 +02:00
|
|
|
logger.warn(
|
|
|
|
"An exceptionCaught() event was fired, and it reached at the end of the " +
|
|
|
|
"pipeline. It usually means the last inbound handler in the pipeline did not " +
|
|
|
|
"handle the exception.", cause);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-08 16:11:15 +02:00
|
|
|
public void fireUserEventTriggered(final Object event) {
|
|
|
|
if (event == null) {
|
|
|
|
throw new NullPointerException("event");
|
|
|
|
}
|
|
|
|
|
2012-06-07 07:52:33 +02:00
|
|
|
DefaultChannelHandlerContext next = this.next;
|
2012-06-02 02:51:19 +02:00
|
|
|
if (next != null) {
|
2012-06-08 16:11:15 +02:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
try {
|
|
|
|
next.handler().userEventTriggered(next, event);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
fireUserEventTriggered(event);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireInboundBufferUpdated() {
|
2012-06-04 09:24:34 +02:00
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
nextCtxFireInboundBufferUpdatedTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(nextCtxFireInboundBufferUpdatedTask);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture bind(SocketAddress localAddress) {
|
|
|
|
return bind(localAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress) {
|
|
|
|
return connect(remoteAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
|
|
|
return connect(remoteAddress, localAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture disconnect() {
|
|
|
|
return disconnect(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture close() {
|
|
|
|
return close(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture deregister() {
|
|
|
|
return deregister(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture flush() {
|
|
|
|
return flush(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message) {
|
|
|
|
return write(message, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
|
2012-06-09 02:44:30 +02:00
|
|
|
return pipeline.bind(nextContext(prev, DIR_OUTBOUND), localAddress, future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
|
|
|
|
return connect(remoteAddress, null, future);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
|
2012-06-09 02:44:30 +02:00
|
|
|
return pipeline.connect(nextContext(prev, DIR_OUTBOUND), remoteAddress, localAddress, future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture disconnect(ChannelFuture future) {
|
2012-06-09 02:44:30 +02:00
|
|
|
return pipeline.disconnect(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture close(ChannelFuture future) {
|
2012-06-09 02:44:30 +02:00
|
|
|
return pipeline.close(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture deregister(ChannelFuture future) {
|
2012-06-09 02:44:30 +02:00
|
|
|
return pipeline.deregister(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-04 09:24:34 +02:00
|
|
|
public ChannelFuture flush(final ChannelFuture future) {
|
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
2012-06-09 02:44:30 +02:00
|
|
|
DefaultChannelHandlerContext prev = nextContext(this.prev, DIR_OUTBOUND);
|
2012-06-04 09:24:34 +02:00
|
|
|
prev.fillBridge();
|
|
|
|
pipeline.flush(prev, future);
|
|
|
|
} else {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
flush(future);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
return future;
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message, ChannelFuture future) {
|
2012-06-03 13:25:03 +02:00
|
|
|
return pipeline.write(prev, message, future);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture newFuture() {
|
2012-06-02 03:34:19 +02:00
|
|
|
return channel.newFuture();
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture newSucceededFuture() {
|
2012-06-02 03:34:19 +02:00
|
|
|
return channel.newSucceededFuture();
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
2012-06-02 03:34:19 +02:00
|
|
|
return channel.newFailedFuture(cause);
|
2012-06-02 02:51:19 +02:00
|
|
|
}
|
2012-06-04 09:24:34 +02:00
|
|
|
|
|
|
|
static final class MessageBridge {
|
2012-06-11 10:02:00 +02:00
|
|
|
final MessageBuf<Object> msgBuf = Unpooled.messageBuffer();
|
2012-06-04 09:24:34 +02:00
|
|
|
final BlockingQueue<Object[]> exchangeBuf = QueueFactory.createQueue();
|
|
|
|
|
|
|
|
void fill() {
|
|
|
|
if (msgBuf.isEmpty()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Object[] data = msgBuf.toArray();
|
|
|
|
msgBuf.clear();
|
|
|
|
exchangeBuf.add(data);
|
|
|
|
}
|
|
|
|
|
2012-06-11 03:43:47 +02:00
|
|
|
void flush(MessageBuf<Object> out) {
|
2012-06-04 09:24:34 +02:00
|
|
|
for (;;) {
|
|
|
|
Object[] data = exchangeBuf.poll();
|
|
|
|
if (data == null) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2012-06-11 15:54:28 +02:00
|
|
|
Collections.addAll(out, data);
|
2012-06-04 09:24:34 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-09 14:05:59 +02:00
|
|
|
static final class ByteBridge {
|
2012-06-11 10:02:00 +02:00
|
|
|
final ByteBuf byteBuf = Unpooled.dynamicBuffer();
|
2012-06-10 04:08:43 +02:00
|
|
|
final BlockingQueue<ByteBuf> exchangeBuf = QueueFactory.createQueue();
|
2012-06-04 09:24:34 +02:00
|
|
|
|
|
|
|
void fill() {
|
|
|
|
if (!byteBuf.readable()) {
|
|
|
|
return;
|
|
|
|
}
|
2012-06-10 04:08:43 +02:00
|
|
|
ByteBuf data = byteBuf.readBytes(byteBuf.readableBytes());
|
2012-06-04 09:24:34 +02:00
|
|
|
byteBuf.discardReadBytes();
|
|
|
|
exchangeBuf.add(data);
|
|
|
|
}
|
|
|
|
|
2012-06-10 04:08:43 +02:00
|
|
|
void flush(ByteBuf out) {
|
2012-06-04 09:24:34 +02:00
|
|
|
for (;;) {
|
2012-06-10 04:08:43 +02:00
|
|
|
ByteBuf data = exchangeBuf.poll();
|
2012-06-04 09:24:34 +02:00
|
|
|
if (data == null) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
out.writeBytes(data);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2012-06-04 22:43:02 +02:00
|
|
|
}
|