2012-06-04 13:31:44 -07:00
|
|
|
/*
|
|
|
|
* Copyright 2012 The Netty Project
|
|
|
|
*
|
|
|
|
* The Netty Project licenses this file to you under the Apache License,
|
|
|
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
* with the License. You may obtain a copy of the License at:
|
|
|
|
*
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
*
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
* License for the specific language governing permissions and limitations
|
|
|
|
* under the License.
|
|
|
|
*/
|
2012-06-01 17:51:19 -07:00
|
|
|
package io.netty.channel;
|
|
|
|
|
2012-06-10 11:08:43 +09:00
|
|
|
import io.netty.buffer.ByteBuf;
|
2012-11-16 06:04:37 +09:00
|
|
|
import io.netty.buffer.ByteBufAllocator;
|
2012-06-10 12:22:32 +09:00
|
|
|
import io.netty.buffer.ChannelBuf;
|
|
|
|
import io.netty.buffer.MessageBuf;
|
2012-06-11 17:02:00 +09:00
|
|
|
import io.netty.buffer.Unpooled;
|
2012-06-01 17:51:19 -07:00
|
|
|
import io.netty.util.DefaultAttributeMap;
|
|
|
|
|
|
|
|
import java.net.SocketAddress;
|
2012-06-07 14:52:33 +09:00
|
|
|
import java.util.Collections;
|
|
|
|
import java.util.EnumSet;
|
2012-07-30 08:01:46 +02:00
|
|
|
import java.util.Queue;
|
2012-06-07 14:52:33 +09:00
|
|
|
import java.util.Set;
|
2012-11-12 14:55:05 -08:00
|
|
|
import java.util.concurrent.Callable;
|
2012-07-30 08:01:46 +02:00
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
2012-11-12 14:55:05 -08:00
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
|
import java.util.concurrent.Future;
|
2012-07-27 20:02:47 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
2012-06-03 18:51:42 -07:00
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
2012-06-01 17:51:19 -07:00
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
import static io.netty.channel.DefaultChannelPipeline.*;
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
|
|
|
|
|
|
|
|
private static final EnumSet<ChannelHandlerType> EMPTY_TYPE = EnumSet.noneOf(ChannelHandlerType.class);
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
static final int DIR_INBOUND = 1;
|
|
|
|
static final int DIR_OUTBOUND = 2;
|
|
|
|
|
|
|
|
private static final int FLAG_NEEDS_LAZY_INIT = 4;
|
2012-06-09 09:44:30 +09:00
|
|
|
|
2012-06-01 17:51:19 -07:00
|
|
|
volatile DefaultChannelHandlerContext next;
|
|
|
|
volatile DefaultChannelHandlerContext prev;
|
2012-11-16 06:04:37 +09:00
|
|
|
|
2012-06-01 18:34:19 -07:00
|
|
|
private final Channel channel;
|
2012-06-01 17:51:19 -07:00
|
|
|
private final DefaultChannelPipeline pipeline;
|
|
|
|
private final String name;
|
2012-06-09 09:44:30 +09:00
|
|
|
private final Set<ChannelHandlerType> type;
|
2012-06-01 17:51:19 -07:00
|
|
|
private final ChannelHandler handler;
|
2012-11-16 06:04:37 +09:00
|
|
|
final int flags;
|
|
|
|
final AtomicBoolean readable = new AtomicBoolean(true);
|
2012-06-07 14:52:33 +09:00
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
EventExecutor executor; // not thread-safe but OK because it never changes once set.
|
|
|
|
|
|
|
|
private MessageBuf<Object> inMsgBuf;
|
2012-12-05 19:28:56 +09:00
|
|
|
private ByteBuf inByteBuf;
|
2012-11-16 06:04:37 +09:00
|
|
|
private MessageBuf<Object> outMsgBuf;
|
2012-12-05 19:28:56 +09:00
|
|
|
private ByteBuf outByteBuf;
|
2012-06-03 18:51:42 -07:00
|
|
|
|
|
|
|
// When the two handlers run in a different thread and they are next to each other,
|
2012-06-04 00:24:34 -07:00
|
|
|
// each other's buffers can be accessed at the same time resulting in a race condition.
|
2012-06-03 18:51:42 -07:00
|
|
|
// To avoid such situation, we lazily creates an additional thread-safe buffer called
|
|
|
|
// 'bridge' so that the two handlers access each other's buffer only via the bridges.
|
|
|
|
// The content written into a bridge is flushed into the actual buffer by flushBridge().
|
2012-11-16 06:04:37 +09:00
|
|
|
private final AtomicReference<MessageBridge> inMsgBridge;
|
|
|
|
AtomicReference<MessageBridge> outMsgBridge;
|
|
|
|
private final AtomicReference<ByteBridge> inByteBridge;
|
|
|
|
AtomicReference<ByteBridge> outByteBridge;
|
2012-07-27 20:02:47 +02:00
|
|
|
|
2012-06-01 17:51:19 -07:00
|
|
|
// Runnables that calls handlers
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable fireChannelRegisteredTask = new Runnable() {
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 14:52:33 +09:00
|
|
|
((ChannelStateHandler) ctx.handler).channelRegistered(ctx);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable fireChannelUnregisteredTask = new Runnable() {
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 14:52:33 +09:00
|
|
|
((ChannelStateHandler) ctx.handler).channelUnregistered(ctx);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable fireChannelActiveTask = new Runnable() {
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 14:52:33 +09:00
|
|
|
((ChannelStateHandler) ctx.handler).channelActive(ctx);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable fireChannelInactiveTask = new Runnable() {
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
try {
|
2012-06-07 14:52:33 +09:00
|
|
|
((ChannelStateHandler) ctx.handler).channelInactive(ctx);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable curCtxFireInboundBufferUpdatedTask = new Runnable() {
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
2012-06-03 18:51:42 -07:00
|
|
|
flushBridge();
|
2012-06-01 17:51:19 -07:00
|
|
|
try {
|
2012-06-07 16:56:21 +09:00
|
|
|
((ChannelStateHandler) ctx.handler).inboundBufferUpdated(ctx);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
} finally {
|
2012-06-10 11:08:43 +09:00
|
|
|
ByteBuf buf = inByteBuf;
|
2012-06-07 14:52:33 +09:00
|
|
|
if (buf != null) {
|
2012-06-04 11:56:00 -07:00
|
|
|
if (!buf.readable()) {
|
|
|
|
buf.discardReadBytes();
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-06-04 00:24:34 -07:00
|
|
|
private final Runnable nextCtxFireInboundBufferUpdatedTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
2012-06-07 14:52:33 +09:00
|
|
|
DefaultChannelHandlerContext next = nextContext(
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext.this.next, DIR_INBOUND);
|
2012-06-04 00:24:34 -07:00
|
|
|
if (next != null) {
|
|
|
|
next.fillBridge();
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.curCtxFireInboundBufferUpdatedTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.curCtxFireInboundBufferUpdatedTask);
|
|
|
|
}
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Runnable freeInboundBufferTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
if (ctx.handler instanceof ChannelInboundHandler) {
|
|
|
|
ChannelInboundHandler h = (ChannelInboundHandler) ctx.handler;
|
|
|
|
try {
|
2012-12-06 19:36:53 +01:00
|
|
|
if (ctx.hasInboundByteBuffer()) {
|
|
|
|
if (ctx.inByteBuf != null) {
|
|
|
|
h.freeInboundBuffer(ctx, ctx.inByteBuf);
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
} else {
|
2012-12-06 19:36:53 +01:00
|
|
|
if (ctx.inMsgBuf != null) {
|
|
|
|
h.freeInboundBuffer(ctx, ctx.inMsgBuf);
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
}
|
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
DefaultChannelHandlerContext nextCtx = nextContext(ctx.next, DIR_INBOUND);
|
|
|
|
if (nextCtx != null) {
|
|
|
|
nextCtx.callFreeInboundBuffer();
|
|
|
|
} else {
|
|
|
|
// Freed all inbound buffers. Free all outbound buffers in a reverse order.
|
|
|
|
pipeline.firstContext(DIR_OUTBOUND).callFreeOutboundBuffer();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
private final Runnable freeOutboundBufferTask = new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
DefaultChannelHandlerContext ctx = DefaultChannelHandlerContext.this;
|
|
|
|
if (ctx.handler instanceof ChannelOutboundHandler) {
|
|
|
|
ChannelOutboundHandler h = (ChannelOutboundHandler) ctx.handler;
|
|
|
|
try {
|
2012-12-06 19:36:53 +01:00
|
|
|
if (ctx.hasOutboundByteBuffer()) {
|
|
|
|
if (ctx.outByteBuf != null) {
|
|
|
|
h.freeOutboundBuffer(ctx, ctx.outByteBuf);
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
} else {
|
2012-12-06 19:36:53 +01:00
|
|
|
if (ctx.outMsgBuf != null) {
|
|
|
|
h.freeOutboundBuffer(ctx, ctx.outMsgBuf);
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
}
|
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
DefaultChannelHandlerContext nextCtx = nextContext(ctx.prev, DIR_OUTBOUND);
|
|
|
|
if (nextCtx != null) {
|
|
|
|
nextCtx.callFreeOutboundBuffer();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
2012-06-01 17:51:19 -07:00
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
DefaultChannelHandlerContext(
|
2012-08-10 20:17:18 +09:00
|
|
|
DefaultChannelPipeline pipeline, EventExecutorGroup group,
|
2012-06-01 17:51:19 -07:00
|
|
|
DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
|
|
|
|
String name, ChannelHandler handler) {
|
|
|
|
|
|
|
|
if (name == null) {
|
|
|
|
throw new NullPointerException("name");
|
|
|
|
}
|
|
|
|
if (handler == null) {
|
|
|
|
throw new NullPointerException("handler");
|
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
int flags = 0;
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
// Determine the type of the specified handler.
|
|
|
|
EnumSet<ChannelHandlerType> type = EMPTY_TYPE.clone();
|
|
|
|
if (handler instanceof ChannelStateHandler) {
|
|
|
|
type.add(ChannelHandlerType.STATE);
|
2012-11-16 06:04:37 +09:00
|
|
|
flags |= DIR_INBOUND;
|
2012-06-09 09:44:30 +09:00
|
|
|
if (handler instanceof ChannelInboundHandler) {
|
|
|
|
type.add(ChannelHandlerType.INBOUND);
|
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
|
|
|
if (handler instanceof ChannelOperationHandler) {
|
|
|
|
type.add(ChannelHandlerType.OPERATION);
|
2012-11-16 06:04:37 +09:00
|
|
|
flags |= DIR_OUTBOUND;
|
2012-06-09 09:44:30 +09:00
|
|
|
if (handler instanceof ChannelOutboundHandler) {
|
|
|
|
type.add(ChannelHandlerType.OUTBOUND);
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
this.type = Collections.unmodifiableSet(type);
|
2012-06-01 17:51:19 -07:00
|
|
|
|
|
|
|
this.prev = prev;
|
|
|
|
this.next = next;
|
|
|
|
|
2012-06-01 18:34:19 -07:00
|
|
|
channel = pipeline.channel;
|
2012-06-01 17:51:19 -07:00
|
|
|
this.pipeline = pipeline;
|
|
|
|
this.name = name;
|
|
|
|
this.handler = handler;
|
|
|
|
|
2012-08-10 20:17:18 +09:00
|
|
|
if (group != null) {
|
2012-06-01 17:51:19 -07:00
|
|
|
// Pin one of the child executors once and remember it so that the same child executor
|
|
|
|
// is used to fire events for the same channel.
|
2012-08-10 20:17:18 +09:00
|
|
|
EventExecutor childExecutor = pipeline.childExecutors.get(group);
|
2012-06-01 17:51:19 -07:00
|
|
|
if (childExecutor == null) {
|
2012-08-10 20:17:18 +09:00
|
|
|
childExecutor = group.next();
|
|
|
|
pipeline.childExecutors.put(group, childExecutor);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
2012-08-10 20:17:18 +09:00
|
|
|
executor = childExecutor;
|
2012-06-04 11:59:31 -07:00
|
|
|
} else if (channel.isRegistered()) {
|
2012-08-10 20:17:18 +09:00
|
|
|
executor = channel.eventLoop();
|
2012-06-01 17:51:19 -07:00
|
|
|
} else {
|
2012-08-10 20:17:18 +09:00
|
|
|
executor = null;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
if (type.contains(ChannelHandlerType.INBOUND)) {
|
2012-06-10 12:22:32 +09:00
|
|
|
ChannelBuf buf;
|
2012-06-01 17:51:19 -07:00
|
|
|
try {
|
2012-06-10 12:22:32 +09:00
|
|
|
buf = ((ChannelInboundHandler) handler).newInboundBuffer(this);
|
2012-06-01 17:51:19 -07:00
|
|
|
} catch (Exception e) {
|
|
|
|
throw new ChannelPipelineException("A user handler failed to create a new inbound buffer.", e);
|
|
|
|
}
|
2012-06-03 18:51:42 -07:00
|
|
|
|
2012-06-10 12:22:32 +09:00
|
|
|
if (buf == null) {
|
|
|
|
throw new ChannelPipelineException("A user handler's newInboundBuffer() returned null");
|
|
|
|
}
|
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
if (buf instanceof ByteBuf) {
|
|
|
|
inByteBuf = (ByteBuf) buf;
|
2012-06-09 21:05:59 +09:00
|
|
|
inByteBridge = new AtomicReference<ByteBridge>();
|
2012-06-07 14:52:33 +09:00
|
|
|
inMsgBuf = null;
|
|
|
|
inMsgBridge = null;
|
2012-06-10 12:22:32 +09:00
|
|
|
} else if (buf instanceof MessageBuf) {
|
2012-06-07 14:52:33 +09:00
|
|
|
inByteBuf = null;
|
2012-06-03 18:51:42 -07:00
|
|
|
inByteBridge = null;
|
2012-06-11 10:43:47 +09:00
|
|
|
inMsgBuf = (MessageBuf<Object>) buf;
|
2012-06-07 14:52:33 +09:00
|
|
|
inMsgBridge = new AtomicReference<MessageBridge>();
|
2012-06-10 12:22:32 +09:00
|
|
|
} else {
|
2012-06-09 09:44:30 +09:00
|
|
|
throw new Error();
|
2012-06-03 18:51:42 -07:00
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
} else {
|
2012-06-07 14:52:33 +09:00
|
|
|
inByteBuf = null;
|
2012-06-03 18:51:42 -07:00
|
|
|
inByteBridge = null;
|
2012-06-07 14:52:33 +09:00
|
|
|
inMsgBuf = null;
|
2012-06-03 18:51:42 -07:00
|
|
|
inMsgBridge = null;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
|
|
|
|
if (type.contains(ChannelHandlerType.OUTBOUND)) {
|
2012-11-16 06:04:37 +09:00
|
|
|
if (prev == null) {
|
|
|
|
// Special case: if pref == null, it means this context for HeadHandler.
|
|
|
|
// HeadHandler is an outbound handler instantiated by the constructor of DefaultChannelPipeline.
|
|
|
|
// Because Channel is not really fully initialized at this point, we should not call
|
|
|
|
// newOutboundBuffer() yet because it will usually lead to NPE.
|
|
|
|
// To work around this problem, we lazily initialize the outbound buffer for this special case.
|
|
|
|
flags |= FLAG_NEEDS_LAZY_INIT;
|
2012-06-10 12:22:32 +09:00
|
|
|
} else {
|
2012-11-16 06:04:37 +09:00
|
|
|
initOutboundBuffer();
|
2012-06-03 18:51:42 -07:00
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
} else {
|
2012-06-07 14:52:33 +09:00
|
|
|
outByteBuf = null;
|
2012-06-03 18:51:42 -07:00
|
|
|
outByteBridge = null;
|
2012-06-07 14:52:33 +09:00
|
|
|
outMsgBuf = null;
|
2012-06-03 18:51:42 -07:00
|
|
|
outMsgBridge = null;
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
|
|
|
|
this.flags = flags;
|
|
|
|
}
|
|
|
|
|
|
|
|
private void lazyInitOutboundBuffer() {
|
|
|
|
if ((flags & FLAG_NEEDS_LAZY_INIT) != 0) {
|
|
|
|
if (outByteBuf == null && outMsgBuf == null) {
|
2012-11-26 16:58:29 +09:00
|
|
|
EventExecutor exec = executor();
|
|
|
|
if (exec.inEventLoop()) {
|
|
|
|
initOutboundBuffer();
|
|
|
|
} else {
|
|
|
|
try {
|
|
|
|
getFromFuture(exec.submit(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
lazyInitOutboundBuffer();
|
|
|
|
}
|
|
|
|
}));
|
|
|
|
} catch (Exception e) {
|
|
|
|
throw new ChannelPipelineException("failed to initialize an outbound buffer lazily", e);
|
|
|
|
}
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private void initOutboundBuffer() {
|
|
|
|
ChannelBuf buf;
|
|
|
|
try {
|
|
|
|
buf = ((ChannelOutboundHandler) handler).newOutboundBuffer(this);
|
|
|
|
} catch (Exception e) {
|
|
|
|
throw new ChannelPipelineException("A user handler failed to create a new outbound buffer.", e);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (buf == null) {
|
|
|
|
throw new ChannelPipelineException("A user handler's newOutboundBuffer() returned null");
|
|
|
|
}
|
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
if (buf instanceof ByteBuf) {
|
|
|
|
outByteBuf = (ByteBuf) buf;
|
2012-11-16 06:04:37 +09:00
|
|
|
outByteBridge = new AtomicReference<ByteBridge>();
|
|
|
|
outMsgBuf = null;
|
|
|
|
outMsgBridge = null;
|
|
|
|
} else if (buf instanceof MessageBuf) {
|
|
|
|
outByteBuf = null;
|
|
|
|
outByteBridge = null;
|
2012-11-30 22:49:51 +09:00
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
MessageBuf<Object> msgBuf = (MessageBuf<Object>) buf;
|
|
|
|
outMsgBuf = msgBuf;
|
2012-11-16 06:04:37 +09:00
|
|
|
outMsgBridge = new AtomicReference<MessageBridge>();
|
|
|
|
} else {
|
|
|
|
throw new Error();
|
|
|
|
}
|
2012-06-03 18:51:42 -07:00
|
|
|
}
|
|
|
|
|
2012-06-04 00:24:34 -07:00
|
|
|
void fillBridge() {
|
|
|
|
if (inMsgBridge != null) {
|
|
|
|
MessageBridge bridge = inMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
} else if (inByteBridge != null) {
|
2012-06-09 21:05:59 +09:00
|
|
|
ByteBridge bridge = inByteBridge.get();
|
2012-06-04 00:24:34 -07:00
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (outMsgBridge != null) {
|
|
|
|
MessageBridge bridge = outMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
} else if (outByteBridge != null) {
|
2012-06-09 21:05:59 +09:00
|
|
|
ByteBridge bridge = outByteBridge.get();
|
2012-06-04 00:24:34 -07:00
|
|
|
if (bridge != null) {
|
|
|
|
bridge.fill();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-03 18:51:42 -07:00
|
|
|
void flushBridge() {
|
|
|
|
if (inMsgBridge != null) {
|
2012-06-04 00:24:34 -07:00
|
|
|
MessageBridge bridge = inMsgBridge.get();
|
2012-06-03 18:51:42 -07:00
|
|
|
if (bridge != null) {
|
2012-06-07 14:52:33 +09:00
|
|
|
bridge.flush(inMsgBuf);
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
} else if (inByteBridge != null) {
|
2012-06-09 21:05:59 +09:00
|
|
|
ByteBridge bridge = inByteBridge.get();
|
2012-06-04 00:24:34 -07:00
|
|
|
if (bridge != null) {
|
2012-06-07 14:52:33 +09:00
|
|
|
bridge.flush(inByteBuf);
|
2012-06-03 18:51:42 -07:00
|
|
|
}
|
|
|
|
}
|
2012-06-04 00:24:34 -07:00
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
lazyInitOutboundBuffer();
|
2012-06-03 18:51:42 -07:00
|
|
|
if (outMsgBridge != null) {
|
2012-06-04 00:24:34 -07:00
|
|
|
MessageBridge bridge = outMsgBridge.get();
|
|
|
|
if (bridge != null) {
|
2012-06-07 14:52:33 +09:00
|
|
|
bridge.flush(outMsgBuf);
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
} else if (outByteBridge != null) {
|
2012-06-09 21:05:59 +09:00
|
|
|
ByteBridge bridge = outByteBridge.get();
|
2012-06-03 18:51:42 -07:00
|
|
|
if (bridge != null) {
|
2012-06-07 14:52:33 +09:00
|
|
|
bridge.flush(outByteBuf);
|
2012-06-03 18:51:42 -07:00
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Channel channel() {
|
2012-06-01 18:34:19 -07:00
|
|
|
return channel;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelPipeline pipeline() {
|
|
|
|
return pipeline;
|
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
@Override
|
|
|
|
public ByteBufAllocator alloc() {
|
|
|
|
return channel.config().getAllocator();
|
|
|
|
}
|
|
|
|
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public EventExecutor executor() {
|
|
|
|
if (executor == null) {
|
2012-06-04 00:24:34 -07:00
|
|
|
return executor = channel.eventLoop();
|
2012-06-01 17:51:19 -07:00
|
|
|
} else {
|
|
|
|
return executor;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelHandler handler() {
|
|
|
|
return handler;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String name() {
|
|
|
|
return name;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-08-28 01:19:45 +02:00
|
|
|
public Set<ChannelHandlerType> types() {
|
2012-06-07 14:52:33 +09:00
|
|
|
return type;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasInboundByteBuffer() {
|
|
|
|
return inByteBuf != null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasInboundMessageBuffer() {
|
|
|
|
return inMsgBuf != null;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 11:08:43 +09:00
|
|
|
public ByteBuf inboundByteBuffer() {
|
2012-06-07 14:52:33 +09:00
|
|
|
if (inByteBuf == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (handler instanceof ChannelInboundHandler) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no inbound byte buffer; it implements %s, but " +
|
|
|
|
"its newInboundBuffer() method created a %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
MessageBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no inbound byte buffer; it does not implement %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName()));
|
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
|
|
|
return inByteBuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@SuppressWarnings("unchecked")
|
2012-06-11 10:43:47 +09:00
|
|
|
public <T> MessageBuf<T> inboundMessageBuffer() {
|
2012-06-07 14:52:33 +09:00
|
|
|
if (inMsgBuf == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (handler instanceof ChannelInboundHandler) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no inbound message buffer; it implements %s, but " +
|
|
|
|
"its newInboundBuffer() method created a %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
ByteBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no inbound message buffer; it does not implement %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName()));
|
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
2012-06-11 10:43:47 +09:00
|
|
|
return (MessageBuf<T>) inMsgBuf;
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasOutboundByteBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
lazyInitOutboundBuffer();
|
2012-06-07 14:52:33 +09:00
|
|
|
return outByteBuf != null;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-07 14:52:33 +09:00
|
|
|
public boolean hasOutboundMessageBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
lazyInitOutboundBuffer();
|
2012-06-07 14:52:33 +09:00
|
|
|
return outMsgBuf != null;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 11:08:43 +09:00
|
|
|
public ByteBuf outboundByteBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
if (outMsgBuf == null) {
|
|
|
|
lazyInitOutboundBuffer();
|
|
|
|
}
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
if (outByteBuf == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (handler instanceof ChannelOutboundHandler) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no outbound byte buffer; it implements %s, but " +
|
|
|
|
"its newOutboundBuffer() method created a %s.",
|
|
|
|
name, ChannelOutboundHandler.class.getSimpleName(),
|
|
|
|
MessageBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no outbound byte buffer; it does not implement %s.",
|
|
|
|
name, ChannelOutboundHandler.class.getSimpleName()));
|
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
|
|
|
return outByteBuf;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-07 14:52:33 +09:00
|
|
|
@SuppressWarnings("unchecked")
|
2012-06-11 10:43:47 +09:00
|
|
|
public <T> MessageBuf<T> outboundMessageBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
if (outMsgBuf == null) {
|
|
|
|
initOutboundBuffer();
|
|
|
|
}
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
if (outMsgBuf == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (handler instanceof ChannelOutboundHandler) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no outbound message buffer; it implements %s, but " +
|
|
|
|
"its newOutboundBuffer() method created a %s.",
|
|
|
|
name, ChannelOutboundHandler.class.getSimpleName(),
|
|
|
|
ByteBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' has no outbound message buffer; it does not implement %s.",
|
|
|
|
name, ChannelOutboundHandler.class.getSimpleName()));
|
|
|
|
}
|
2012-06-07 14:52:33 +09:00
|
|
|
}
|
2012-06-11 10:43:47 +09:00
|
|
|
return (MessageBuf<T>) outMsgBuf;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
2012-11-12 14:55:05 -08:00
|
|
|
/**
|
|
|
|
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
|
|
|
|
* current thread will be interrupted and this will return {@code null}. It is expected that the task
|
|
|
|
* performs any appropriate locking.
|
|
|
|
* <p>
|
|
|
|
* If the {@link Callable#call()} call throws a {@link Throwable}, but it is not an instance of
|
|
|
|
* {@link Error}, {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an
|
|
|
|
* {@link AssertionError} and that is thrown instead.</p>
|
|
|
|
*
|
|
|
|
* @param c execute this callable and return its value
|
|
|
|
* @param <T> the return value type
|
|
|
|
* @return the task's return value, or {@code null} if the task was interrupted.
|
|
|
|
* @see Callable#call()
|
|
|
|
* @see Future#get()
|
|
|
|
* @throws Error if the task threw this.
|
|
|
|
* @throws RuntimeException if the task threw this.
|
|
|
|
* @throws Exception if the task threw this.
|
|
|
|
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
|
|
|
* {@link Throwable}.
|
|
|
|
*/
|
2012-11-16 06:04:37 +09:00
|
|
|
private <T> T executeOnEventLoop(Callable<T> c) throws Exception {
|
2012-11-12 14:55:05 -08:00
|
|
|
return getFromFuture(executor().submit(c));
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Executes a task on the event loop and waits for it to finish. If the task is interrupted, then the
|
|
|
|
* current thread will be interrupted. It is expected that the task performs any appropriate locking.
|
|
|
|
* <p>
|
|
|
|
* If the {@link Runnable#run()} call throws a {@link Throwable}, but it is not an instance of
|
|
|
|
* {@link Error} or {@link RuntimeException}, then it is wrapped inside a
|
|
|
|
* {@link ChannelPipelineException} and that is thrown instead.</p>
|
|
|
|
*
|
|
|
|
* @param r execute this runnable
|
|
|
|
* @see Runnable#run()
|
|
|
|
* @see Future#get()
|
|
|
|
* @throws Error if the task threw this.
|
|
|
|
* @throws RuntimeException if the task threw this.
|
|
|
|
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
|
|
|
* {@link Throwable}.
|
|
|
|
*/
|
|
|
|
void executeOnEventLoop(Runnable r) {
|
|
|
|
waitForFuture(executor().submit(r));
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Waits for a future to finish and gets the result. If the task is interrupted, then the current thread
|
|
|
|
* will be interrupted and this will return {@code null}. It is expected that the task performs any
|
|
|
|
* appropriate locking.
|
|
|
|
* <p>
|
|
|
|
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error},
|
|
|
|
* {@link RuntimeException}, or {@link Exception}, then it is wrapped inside an {@link AssertionError}
|
|
|
|
* and that is thrown instead.</p>
|
|
|
|
*
|
|
|
|
* @param future wait for this future
|
|
|
|
* @param <T> the return value type
|
|
|
|
* @return the task's return value, or {@code null} if the task was interrupted.
|
|
|
|
* @see Future#get()
|
|
|
|
* @throws Error if the task threw this.
|
|
|
|
* @throws RuntimeException if the task threw this.
|
|
|
|
* @throws Exception if the task threw this.
|
|
|
|
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
|
|
|
* {@link Throwable}.
|
|
|
|
*/
|
2012-11-16 06:04:37 +09:00
|
|
|
private static <T> T getFromFuture(Future<T> future) throws Exception {
|
2012-11-12 14:55:05 -08:00
|
|
|
try {
|
|
|
|
return future.get();
|
|
|
|
} catch (ExecutionException ex) {
|
|
|
|
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
|
|
|
|
|
|
|
Throwable t = ex.getCause();
|
|
|
|
if (t instanceof Error) { throw (Error) t; }
|
|
|
|
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
|
|
|
|
if (t instanceof Exception) { throw (Exception) t; }
|
|
|
|
throw new ChannelPipelineException(t);
|
|
|
|
} catch (InterruptedException ex) {
|
|
|
|
// Interrupt the calling thread (note that this method is not called from the event loop)
|
|
|
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Waits for a future to finish. If the task is interrupted, then the current thread will be interrupted.
|
|
|
|
* It is expected that the task performs any appropriate locking.
|
|
|
|
* <p>
|
|
|
|
* If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
|
|
|
|
* {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
|
|
|
|
* thrown instead.</p>
|
|
|
|
*
|
|
|
|
* @param future wait for this future
|
|
|
|
* @see Future#get()
|
|
|
|
* @throws Error if the task threw this.
|
|
|
|
* @throws RuntimeException if the task threw this.
|
|
|
|
* @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
|
|
|
|
* {@link Throwable}.
|
|
|
|
*/
|
2012-11-16 06:04:37 +09:00
|
|
|
static void waitForFuture(Future<?> future) {
|
2012-11-12 14:55:05 -08:00
|
|
|
try {
|
|
|
|
future.get();
|
|
|
|
} catch (ExecutionException ex) {
|
|
|
|
// In the arbitrary case, we can throw Error, RuntimeException, and Exception
|
|
|
|
|
|
|
|
Throwable t = ex.getCause();
|
|
|
|
if (t instanceof Error) { throw (Error) t; }
|
|
|
|
if (t instanceof RuntimeException) { throw (RuntimeException) t; }
|
|
|
|
throw new ChannelPipelineException(t);
|
|
|
|
} catch (InterruptedException ex) {
|
|
|
|
// Interrupt the calling thread (note that this method is not called from the event loop)
|
|
|
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ByteBuf replaceInboundByteBuffer(final ByteBuf newInboundByteBuf) {
|
|
|
|
if (newInboundByteBuf == null) {
|
|
|
|
throw new NullPointerException("newInboundByteBuf");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!executor().inEventLoop()) {
|
|
|
|
try {
|
|
|
|
return executeOnEventLoop(new Callable<ByteBuf>() {
|
|
|
|
@Override
|
|
|
|
public ByteBuf call() {
|
|
|
|
return replaceInboundByteBuffer(newInboundByteBuf);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Exception ex) {
|
2012-11-26 16:58:29 +09:00
|
|
|
throw new ChannelPipelineException("failed to replace an inbound byte buffer", ex);
|
2012-11-12 14:55:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ByteBuf currentInboundByteBuf = inboundByteBuffer();
|
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
inByteBuf = newInboundByteBuf;
|
2012-11-12 14:55:05 -08:00
|
|
|
return currentInboundByteBuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
public <T> MessageBuf<T> replaceInboundMessageBuffer(final MessageBuf<T> newInboundMsgBuf) {
|
|
|
|
if (newInboundMsgBuf == null) {
|
|
|
|
throw new NullPointerException("newInboundMsgBuf");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!executor().inEventLoop()) {
|
|
|
|
try {
|
|
|
|
return executeOnEventLoop(new Callable<MessageBuf<T>>() {
|
|
|
|
@Override
|
|
|
|
public MessageBuf<T> call() {
|
|
|
|
return replaceInboundMessageBuffer(newInboundMsgBuf);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Exception ex) {
|
2012-11-26 16:58:29 +09:00
|
|
|
throw new ChannelPipelineException("failed to replace an inbound message buffer", ex);
|
2012-11-12 14:55:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
MessageBuf<T> currentInboundMsgBuf = inboundMessageBuffer();
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
inMsgBuf = (MessageBuf<Object>) newInboundMsgBuf;
|
2012-11-12 14:55:05 -08:00
|
|
|
return currentInboundMsgBuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ByteBuf replaceOutboundByteBuffer(final ByteBuf newOutboundByteBuf) {
|
|
|
|
if (newOutboundByteBuf == null) {
|
|
|
|
throw new NullPointerException("newOutboundByteBuf");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!executor().inEventLoop()) {
|
|
|
|
try {
|
|
|
|
return executeOnEventLoop(new Callable<ByteBuf>() {
|
|
|
|
@Override
|
|
|
|
public ByteBuf call() {
|
|
|
|
return replaceOutboundByteBuffer(newOutboundByteBuf);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Exception ex) {
|
2012-11-26 16:58:29 +09:00
|
|
|
throw new ChannelPipelineException("failed to replace an outbound byte buffer", ex);
|
2012-11-12 14:55:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ByteBuf currentOutboundByteBuf = outboundByteBuffer();
|
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
outByteBuf = newOutboundByteBuf;
|
2012-11-12 14:55:05 -08:00
|
|
|
return currentOutboundByteBuf;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
public <T> MessageBuf<T> replaceOutboundMessageBuffer(final MessageBuf<T> newOutboundMsgBuf) {
|
|
|
|
if (newOutboundMsgBuf == null) {
|
|
|
|
throw new NullPointerException("newOutboundMsgBuf");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!executor().inEventLoop()) {
|
|
|
|
try {
|
|
|
|
return executeOnEventLoop(new Callable<MessageBuf<T>>() {
|
|
|
|
@Override
|
|
|
|
public MessageBuf<T> call() {
|
|
|
|
return replaceOutboundMessageBuffer(newOutboundMsgBuf);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Exception ex) {
|
2012-11-26 16:58:29 +09:00
|
|
|
throw new ChannelPipelineException("failed to replace an outbound message buffer", ex);
|
2012-11-12 14:55:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
MessageBuf<T> currentOutboundMsgBuf = outboundMessageBuffer();
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
outMsgBuf = (MessageBuf<Object>) newOutboundMsgBuf;
|
2012-11-12 14:55:05 -08:00
|
|
|
return currentOutboundMsgBuf;
|
|
|
|
}
|
|
|
|
|
2012-06-03 04:35:38 -07:00
|
|
|
@Override
|
|
|
|
public boolean hasNextInboundByteBuffer() {
|
2012-06-08 23:11:15 +09:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (ctx.inByteBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-03 04:35:38 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextInboundMessageBuffer() {
|
2012-06-08 23:11:15 +09:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (ctx.inMsgBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-03 04:35:38 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextOutboundByteBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
DefaultChannelHandlerContext ctx = prev;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx.lazyInitOutboundBuffer();
|
|
|
|
|
|
|
|
if (ctx.outByteBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.prev;
|
|
|
|
}
|
2012-06-03 04:35:38 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean hasNextOutboundMessageBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
DefaultChannelHandlerContext ctx = prev;
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx.lazyInitOutboundBuffer();
|
|
|
|
|
|
|
|
if (ctx.outMsgBridge != null) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
ctx = ctx.prev;
|
|
|
|
}
|
2012-06-03 04:35:38 -07:00
|
|
|
}
|
|
|
|
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
2012-06-10 11:08:43 +09:00
|
|
|
public ByteBuf nextInboundByteBuffer() {
|
2012-06-08 23:11:15 +09:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
final Thread currentThread = Thread.currentThread();
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (prev != null) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' could not find a %s whose inbound buffer is %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
ByteBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the pipeline does not contain a %s whose inbound buffer is %s.",
|
|
|
|
ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
ByteBuf.class.getSimpleName()));
|
|
|
|
}
|
2012-06-08 23:11:15 +09:00
|
|
|
}
|
|
|
|
if (ctx.inByteBuf != null) {
|
|
|
|
if (ctx.executor().inEventLoop(currentThread)) {
|
|
|
|
return ctx.inByteBuf;
|
|
|
|
} else {
|
2012-06-09 21:05:59 +09:00
|
|
|
ByteBridge bridge = ctx.inByteBridge.get();
|
2012-06-08 23:11:15 +09:00
|
|
|
if (bridge == null) {
|
2012-11-16 06:04:37 +09:00
|
|
|
bridge = new ByteBridge(ctx);
|
2012-06-08 23:11:15 +09:00
|
|
|
if (!ctx.inByteBridge.compareAndSet(null, bridge)) {
|
|
|
|
bridge = ctx.inByteBridge.get();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return bridge.byteBuf;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-11 10:43:47 +09:00
|
|
|
public MessageBuf<Object> nextInboundMessageBuffer() {
|
2012-06-08 23:11:15 +09:00
|
|
|
DefaultChannelHandlerContext ctx = next;
|
|
|
|
final Thread currentThread = Thread.currentThread();
|
|
|
|
for (;;) {
|
|
|
|
if (ctx == null) {
|
2012-08-20 21:03:23 +09:00
|
|
|
if (prev != null) {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the handler '%s' could not find a %s whose inbound buffer is %s.",
|
|
|
|
name, ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
MessageBuf.class.getSimpleName()));
|
|
|
|
} else {
|
|
|
|
throw new NoSuchBufferException(String.format(
|
|
|
|
"the pipeline does not contain a %s whose inbound buffer is %s.",
|
|
|
|
ChannelInboundHandler.class.getSimpleName(),
|
|
|
|
MessageBuf.class.getSimpleName()));
|
|
|
|
}
|
2012-06-08 23:11:15 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
if (ctx.inMsgBuf != null) {
|
|
|
|
if (ctx.executor().inEventLoop(currentThread)) {
|
|
|
|
return ctx.inMsgBuf;
|
|
|
|
} else {
|
|
|
|
MessageBridge bridge = ctx.inMsgBridge.get();
|
|
|
|
if (bridge == null) {
|
|
|
|
bridge = new MessageBridge();
|
|
|
|
if (!ctx.inMsgBridge.compareAndSet(null, bridge)) {
|
|
|
|
bridge = ctx.inMsgBridge.get();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return bridge.msgBuf;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ctx = ctx.next;
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-10 11:08:43 +09:00
|
|
|
public ByteBuf nextOutboundByteBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
return pipeline.nextOutboundByteBuffer(prev);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-11 10:43:47 +09:00
|
|
|
public MessageBuf<Object> nextOutboundMessageBuffer() {
|
2012-11-16 06:04:37 +09:00
|
|
|
return pipeline.nextOutboundMessageBuffer(prev);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelRegistered() {
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelRegisteredTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelRegisteredTask);
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelUnregistered() {
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelUnregisteredTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelUnregisteredTask);
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelActive() {
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelActiveTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelActiveTask);
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireChannelInactive() {
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext next = nextContext(this.next, DIR_INBOUND);
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
next.fireChannelInactiveTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(next.fireChannelInactiveTask);
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-08 23:11:15 +09:00
|
|
|
public void fireExceptionCaught(final Throwable cause) {
|
|
|
|
if (cause == null) {
|
|
|
|
throw new NullPointerException("cause");
|
|
|
|
}
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
DefaultChannelHandlerContext next = this.next;
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
try {
|
|
|
|
next.handler().exceptionCaught(next, cause);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
if (logger.isWarnEnabled()) {
|
|
|
|
logger.warn(
|
|
|
|
"An exception was thrown by a user handler's " +
|
|
|
|
"exceptionCaught() method while handling the following exception:", cause);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
2012-06-12 17:25:27 +09:00
|
|
|
try {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
fireExceptionCaught(cause);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
} catch (Throwable t) {
|
|
|
|
if (logger.isWarnEnabled()) {
|
|
|
|
logger.warn("Failed to submit an exceptionCaught() event.", t);
|
|
|
|
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
|
2012-06-08 23:11:15 +09:00
|
|
|
}
|
2012-06-12 17:25:27 +09:00
|
|
|
}
|
2012-06-08 23:11:15 +09:00
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
} else {
|
2012-06-08 23:11:15 +09:00
|
|
|
logger.warn(
|
|
|
|
"An exceptionCaught() event was fired, and it reached at the end of the " +
|
2012-11-16 06:04:37 +09:00
|
|
|
"pipeline. It usually means the last inbound handler in the pipeline did not " +
|
|
|
|
"handle the exception.", cause);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-08 23:11:15 +09:00
|
|
|
public void fireUserEventTriggered(final Object event) {
|
|
|
|
if (event == null) {
|
|
|
|
throw new NullPointerException("event");
|
|
|
|
}
|
|
|
|
|
2012-06-07 14:52:33 +09:00
|
|
|
DefaultChannelHandlerContext next = this.next;
|
2012-06-01 17:51:19 -07:00
|
|
|
if (next != null) {
|
2012-06-08 23:11:15 +09:00
|
|
|
EventExecutor executor = next.executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
try {
|
|
|
|
next.handler().userEventTriggered(next, event);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
pipeline.notifyHandlerException(t);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
fireUserEventTriggered(event);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void fireInboundBufferUpdated() {
|
2012-06-04 00:24:34 -07:00
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
nextCtxFireInboundBufferUpdatedTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(nextCtxFireInboundBufferUpdatedTask);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture bind(SocketAddress localAddress) {
|
|
|
|
return bind(localAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress) {
|
|
|
|
return connect(remoteAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
|
|
|
return connect(remoteAddress, localAddress, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture disconnect() {
|
|
|
|
return disconnect(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture close() {
|
|
|
|
return close(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture deregister() {
|
|
|
|
return deregister(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture flush() {
|
|
|
|
return flush(newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message) {
|
|
|
|
return write(message, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture bind(SocketAddress localAddress, ChannelFuture future) {
|
2012-06-09 09:44:30 +09:00
|
|
|
return pipeline.bind(nextContext(prev, DIR_OUTBOUND), localAddress, future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, ChannelFuture future) {
|
|
|
|
return connect(remoteAddress, null, future);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future) {
|
2012-06-09 09:44:30 +09:00
|
|
|
return pipeline.connect(nextContext(prev, DIR_OUTBOUND), remoteAddress, localAddress, future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture disconnect(ChannelFuture future) {
|
2012-06-09 09:44:30 +09:00
|
|
|
return pipeline.disconnect(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture close(ChannelFuture future) {
|
2012-06-09 09:44:30 +09:00
|
|
|
return pipeline.close(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture deregister(ChannelFuture future) {
|
2012-06-09 09:44:30 +09:00
|
|
|
return pipeline.deregister(nextContext(prev, DIR_OUTBOUND), future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2012-06-04 00:24:34 -07:00
|
|
|
public ChannelFuture flush(final ChannelFuture future) {
|
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
2012-06-09 09:44:30 +09:00
|
|
|
DefaultChannelHandlerContext prev = nextContext(this.prev, DIR_OUTBOUND);
|
2012-06-04 00:24:34 -07:00
|
|
|
prev.fillBridge();
|
|
|
|
pipeline.flush(prev, future);
|
|
|
|
} else {
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
@Override
|
|
|
|
public void run() {
|
|
|
|
flush(future);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
return future;
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture write(Object message, ChannelFuture future) {
|
2012-06-03 04:25:03 -07:00
|
|
|
return pipeline.write(prev, message, future);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
void callFreeInboundBuffer() {
|
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
freeInboundBufferTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(freeInboundBufferTask);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** Invocation initiated by {@link #freeInboundBufferTask} after freeing all inbound buffers. */
|
|
|
|
private void callFreeOutboundBuffer() {
|
|
|
|
EventExecutor executor = executor();
|
|
|
|
if (executor.inEventLoop()) {
|
|
|
|
freeOutboundBufferTask.run();
|
|
|
|
} else {
|
|
|
|
executor.execute(freeOutboundBufferTask);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-01 17:51:19 -07:00
|
|
|
@Override
|
|
|
|
public ChannelFuture newFuture() {
|
2012-06-01 18:34:19 -07:00
|
|
|
return channel.newFuture();
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture newSucceededFuture() {
|
2012-06-01 18:34:19 -07:00
|
|
|
return channel.newSucceededFuture();
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture newFailedFuture(Throwable cause) {
|
2012-06-01 18:34:19 -07:00
|
|
|
return channel.newFailedFuture(cause);
|
2012-06-01 17:51:19 -07:00
|
|
|
}
|
2012-06-04 00:24:34 -07:00
|
|
|
|
|
|
|
static final class MessageBridge {
|
2012-06-11 17:02:00 +09:00
|
|
|
final MessageBuf<Object> msgBuf = Unpooled.messageBuffer();
|
2012-06-04 00:24:34 -07:00
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
private final Queue<Object[]> exchangeBuf = new ConcurrentLinkedQueue<Object[]>();
|
|
|
|
|
|
|
|
private void fill() {
|
2012-06-04 00:24:34 -07:00
|
|
|
if (msgBuf.isEmpty()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Object[] data = msgBuf.toArray();
|
|
|
|
msgBuf.clear();
|
|
|
|
exchangeBuf.add(data);
|
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
private void flush(MessageBuf<Object> out) {
|
2012-06-04 00:24:34 -07:00
|
|
|
for (;;) {
|
|
|
|
Object[] data = exchangeBuf.poll();
|
|
|
|
if (data == null) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2012-06-11 22:54:28 +09:00
|
|
|
Collections.addAll(out, data);
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-09 21:05:59 +09:00
|
|
|
static final class ByteBridge {
|
2012-12-05 19:28:56 +09:00
|
|
|
final ByteBuf byteBuf;
|
2012-11-16 06:04:37 +09:00
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
private final Queue<ByteBuf> exchangeBuf = new ConcurrentLinkedQueue<ByteBuf>();
|
2012-11-16 06:04:37 +09:00
|
|
|
private final ChannelHandlerContext ctx;
|
2012-06-04 00:24:34 -07:00
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
ByteBridge(ChannelHandlerContext ctx) {
|
|
|
|
this.ctx = ctx;
|
|
|
|
// TODO Choose whether to use heap or direct buffer depending on the context's buffer type.
|
2012-12-05 19:28:56 +09:00
|
|
|
byteBuf = ctx.alloc().buffer();
|
2012-11-16 06:04:37 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
private void fill() {
|
2012-06-04 00:24:34 -07:00
|
|
|
if (!byteBuf.readable()) {
|
|
|
|
return;
|
|
|
|
}
|
2012-11-16 06:04:37 +09:00
|
|
|
|
|
|
|
int dataLen = byteBuf.readableBytes();
|
|
|
|
ByteBuf data;
|
|
|
|
if (byteBuf.isDirect()) {
|
|
|
|
data = ctx.alloc().directBuffer(dataLen, dataLen);
|
|
|
|
} else {
|
|
|
|
data = ctx.alloc().buffer(dataLen, dataLen);
|
|
|
|
}
|
|
|
|
|
|
|
|
byteBuf.readBytes(data);
|
2012-12-05 19:28:56 +09:00
|
|
|
byteBuf.unsafe().discardSomeReadBytes();
|
2012-11-16 06:04:37 +09:00
|
|
|
|
2012-12-05 19:28:56 +09:00
|
|
|
exchangeBuf.add(data);
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
private void flush(ByteBuf out) {
|
2012-06-04 00:24:34 -07:00
|
|
|
for (;;) {
|
2012-12-05 19:28:56 +09:00
|
|
|
ByteBuf data = exchangeBuf.poll();
|
2012-06-04 00:24:34 -07:00
|
|
|
if (data == null) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2012-11-16 06:04:37 +09:00
|
|
|
try {
|
|
|
|
out.writeBytes(data);
|
|
|
|
} finally {
|
2012-12-05 19:28:56 +09:00
|
|
|
data.unsafe().free();
|
2012-11-16 06:04:37 +09:00
|
|
|
}
|
2012-06-04 00:24:34 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2012-07-27 20:02:47 +02:00
|
|
|
|
|
|
|
@Override
|
|
|
|
public boolean isReadable() {
|
2012-08-01 11:35:03 +02:00
|
|
|
return readable.get();
|
2012-07-27 20:02:47 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void readable(boolean readable) {
|
2012-08-10 20:17:18 +09:00
|
|
|
pipeline.readable(this, readable);
|
2012-07-27 20:02:47 +02:00
|
|
|
}
|
2012-10-24 18:27:26 +02:00
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture sendFile(FileRegion region) {
|
|
|
|
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, newFuture());
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public ChannelFuture sendFile(FileRegion region, ChannelFuture future) {
|
|
|
|
return pipeline.sendFile(nextContext(prev, DIR_OUTBOUND), region, future);
|
|
|
|
}
|
2012-06-04 13:43:02 -07:00
|
|
|
}
|