Merge branch 'master' into pseudorandom-channel-IDs

This commit is contained in:
Cruz Julian Bishop 2012-08-25 13:34:17 +10:00
commit ad7f7a2f25
6 changed files with 19 additions and 7 deletions

View File

@ -1166,12 +1166,17 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
components.subList(0, firstComponentId).clear();
// Replace the first readable component with a new slice.
// Remove or replace the first readable component with a new slice.
Component c = components.get(0);
int adjustment = readerIndex - c.offset;
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
c.buf.unsafe().release();
components.set(0, newC);
if (adjustment == c.length) {
// new slice would be empty, so remove instead
components.remove(0);
} else {
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
c.buf.unsafe().release();
components.set(0, newC);
}
// Update indexes and markers.
updateComponentOffsets(0);

View File

@ -34,7 +34,7 @@ public abstract class ChannelInboundByteHandlerAdapter
inboundBufferUpdated(ctx, in);
} finally {
if (!in.readable()) {
in.discardReadBytes();
in.unsafe().discardSomeReadBytes();
}
}
}

View File

@ -28,6 +28,8 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
firstMessageReceived(ctx);
MessageBuf<I> in = ctx.inboundMessageBuffer();
for (;;) {
I msg = in.poll();
@ -40,7 +42,11 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
ctx.fireExceptionCaught(t);
}
}
lastMessageReceived(ctx);
}
public void firstMessageReceived(ChannelHandlerContext ctx) throws Exception { }
public abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
public void lastMessageReceived(ChannelHandlerContext ctx) throws Exception { }
}

View File

@ -46,6 +46,7 @@ final class AioServerSocketChannelConfig extends DefaultChannelConfig
}
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());

View File

@ -326,7 +326,6 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
// This is needed as the ByteBuffer and the ByteBuf does not share
// each others index
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
expandReadBuffer(byteBuf);
read = true;
} else if (localReadAmount < 0) {
@ -390,9 +389,9 @@ public class AioSocketChannel extends AbstractAioChannel implements SocketChanne
@Override
protected void completed0(Void result, AioSocketChannel channel) {
channel.beginRead();
((AbstractAioUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive();
channel.beginRead();
}
@Override

View File

@ -57,6 +57,7 @@ final class AioSocketChannelConfig extends DefaultChannelConfig
}
@Override
@SuppressWarnings("unchecked")
public <T> T getOption(ChannelOption<T> option) {
if (option == SO_RCVBUF) {
return (T) Integer.valueOf(getReceiveBufferSize());