Prefer MessageBuf over Queue where possible

- Also replaced thread safe queues with non-thread-safe ones where
  possible
- Unpooled.wrappedBuffer(Queue<T>) does not wrap MessageBuf anymore
This commit is contained in:
Trustin Lee 2012-06-12 17:02:00 +09:00
parent e1faea035e
commit ecd0ae5406
25 changed files with 117 additions and 92 deletions

View File

@ -113,6 +113,9 @@ public final class Unpooled {
} }
public static <T> MessageBuf<T> wrappedBuffer(Queue<T> queue) { public static <T> MessageBuf<T> wrappedBuffer(Queue<T> queue) {
if (queue instanceof MessageBuf) {
return (MessageBuf<T>) queue;
}
return new QueueBackedMessageBuf<T>(queue); return new QueueBackedMessageBuf<T>(queue);
} }

View File

@ -20,8 +20,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelHandler; import io.netty.channel.CombinedChannelHandler;
import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.util.internal.QueueFactory;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class HttpClientCodec extends CombinedChannelHandler { public class HttpClientCodec extends CombinedChannelHandler {
/** A queue that is used for correlating a request and a response. */ /** A queue that is used for correlating a request and a response. */
final Queue<HttpMethod> queue = QueueFactory.createQueue(); final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
/** If true, decoding stops (i.e. pass-through) */ /** If true, decoding stops (i.e. pass-through) */
volatile boolean done; volatile boolean done;

View File

@ -20,8 +20,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedByteChannel; import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.internal.QueueFactory;
import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
/** /**
@ -48,7 +48,7 @@ import java.util.Queue;
*/ */
public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, HttpMessage, Object, Object> { public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessage, HttpMessage, Object, Object> {
private final Queue<String> acceptEncodingQueue = QueueFactory.createQueue(); private final Queue<String> acceptEncodingQueue = new ArrayDeque<String>();
private EmbeddedByteChannel encoder; private EmbeddedByteChannel encoder;
/** /**

View File

@ -15,12 +15,14 @@
*/ */
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.QueueFactory;
import java.util.Comparator; import java.util.Comparator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
final class SpdySession { final class SpdySession {
@ -176,8 +178,7 @@ final class SpdySession {
private final AtomicInteger sendWindowSize; private final AtomicInteger sendWindowSize;
private final AtomicInteger receiveWindowSize; private final AtomicInteger receiveWindowSize;
private volatile int receiveWindowSizeLowerBound; private volatile int receiveWindowSizeLowerBound;
private final ConcurrentLinkedQueue<Object> pendingWriteQueue = private final BlockingQueue<Object> pendingWriteQueue = QueueFactory.createQueue();
new ConcurrentLinkedQueue<Object>();
public StreamState( public StreamState(
byte priority, boolean remoteSideClosed, boolean localSideClosed, byte priority, boolean remoteSideClosed, boolean localSideClosed,

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -97,7 +96,7 @@ public class SpdySessionHandler
@Override @Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Queue<Object> in = ctx.inboundMessageBuffer(); MessageBuf<Object> in = ctx.inboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -443,7 +442,7 @@ public class SpdySessionHandler
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
Queue<Object> in = ctx.outboundMessageBuffer(); MessageBuf<Object> in = ctx.outboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {

View File

@ -16,17 +16,16 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import java.util.Queue;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.outboundMessageBuffer(); MessageBuf<I> in = ctx.outboundMessageBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
for (;;) { for (;;) {

View File

@ -21,8 +21,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
import java.util.Queue;
public abstract class MessageToMessageDecoder<I, O> public abstract class MessageToMessageDecoder<I, O>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
@ -34,7 +32,7 @@ public abstract class MessageToMessageDecoder<I, O>
@Override @Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception { throws Exception {
Queue<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
boolean notify = false; boolean notify = false;
for (;;) { for (;;) {
try { try {

View File

@ -15,17 +15,16 @@
*/ */
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundMessageHandlerAdapter; import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
import java.util.Queue;
public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToMessageEncoder<I, O> extends ChannelOutboundMessageHandlerAdapter<I> {
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
Queue<I> in = ctx.outboundMessageBuffer(); MessageBuf<I> in = ctx.outboundMessageBuffer();
for (;;) { for (;;) {
try { try {
Object msg = in.poll(); Object msg = in.poll();

View File

@ -15,13 +15,13 @@
*/ */
package io.netty.example.factorial; package io.netty.example.factorial;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.Queue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level; import java.util.logging.Level;
@ -99,7 +99,7 @@ public class FactorialClientHandler extends ChannelInboundMessageHandlerAdapter<
private void sendNumbers() { private void sendNumbers() {
// Do not send more than 4096 numbers. // Do not send more than 4096 numbers.
boolean finished = false; boolean finished = false;
Queue<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
while (out.size() < 4096) { while (out.size() < 4096) {
if (i <= count) { if (i <= count) {
out.add(Integer.valueOf(i)); out.add(Integer.valueOf(i));

View File

@ -22,8 +22,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandler; import io.netty.channel.ChannelInboundMessageHandler;
import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.ChannelOutboundMessageHandler;
import java.util.Queue;
public class MessageLoggingHandler public class MessageLoggingHandler
extends LoggingHandler extends LoggingHandler
implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> { implements ChannelInboundMessageHandler<Object>, ChannelOutboundMessageHandler<Object> {
@ -62,12 +60,12 @@ public class MessageLoggingHandler
@Override @Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) public void inboundBufferUpdated(ChannelHandlerContext ctx)
throws Exception { throws Exception {
Queue<Object> buf = ctx.inboundMessageBuffer(); MessageBuf<Object> buf = ctx.inboundMessageBuffer();
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf))); logger.log(internalLevel, format(ctx, formatBuffer("RECEIVED", buf)));
} }
Queue<Object> out = ctx.nextInboundMessageBuffer(); MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
for (;;) { for (;;) {
Object o = buf.poll(); Object o = buf.poll();
if (o == null) { if (o == null) {
@ -81,12 +79,12 @@ public class MessageLoggingHandler
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception { throws Exception {
Queue<Object> buf = ctx.outboundMessageBuffer(); MessageBuf<Object> buf = ctx.outboundMessageBuffer();
if (logger.isEnabled(internalLevel)) { if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf))); logger.log(internalLevel, format(ctx, formatBuffer("WRITE", buf)));
} }
Queue<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
for (;;) { for (;;) {
Object o = buf.poll(); Object o = buf.poll();
if (o == null) { if (o == null) {
@ -97,7 +95,7 @@ public class MessageLoggingHandler
ctx.flush(future); ctx.flush(future);
} }
protected String formatBuffer(String message, Queue<Object> buf) { protected String formatBuffer(String message, MessageBuf<Object> buf) {
return message + '(' + buf.size() + "): " + buf; return message + '(' + buf.size() + "): " + buf;
} }
} }

View File

@ -248,7 +248,7 @@ public class ServerBootstrap {
@Override @Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) { public void inboundBufferUpdated(ChannelHandlerContext ctx) {
Queue<Channel> in = ctx.inboundMessageBuffer(); MessageBuf<Channel> in = ctx.inboundMessageBuffer();
for (;;) { for (;;) {
Channel child = in.poll(); Channel child = in.poll();
if (child == null) { if (child == null) {

View File

@ -26,7 +26,6 @@ import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque; import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -648,7 +647,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
} }
} else { } else {
Queue<Object> out = ctx.outboundMessageBuffer(); MessageBuf<Object> out = ctx.outboundMessageBuffer();
int oldSize = out.size(); int oldSize = out.size();
try { try {
doFlushMessageBuffer(out); doFlushMessageBuffer(out);
@ -717,7 +716,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected void doFlushByteBuffer(ByteBuf buf) throws Exception { protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -81,7 +81,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
} }
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -30,7 +30,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override @Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception { public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Queue<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
for (;;) { for (;;) {
I msg = in.poll(); I msg = in.poll();
if (msg == null) { if (msg == null) {

View File

@ -0,0 +1,42 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
public class IllegalBufferAccessException extends ChannelPipelineException {
private static final long serialVersionUID = -1313647533364810258L;
private static final String DEFAULT_MESSAGE =
"The buffers that belong to a " + ChannelHandlerContext.class.getSimpleName() +
" must be accessed from the thread of the associated " +
EventExecutor.class.getSimpleName() + ".";
public IllegalBufferAccessException() {
this(DEFAULT_MESSAGE);
}
public IllegalBufferAccessException(String message, Throwable cause) {
super(message, cause);
}
public IllegalBufferAccessException(String message) {
super(message);
}
public IllegalBufferAccessException(Throwable cause) {
super(cause);
}
}

View File

@ -100,7 +100,7 @@ public abstract class AbstractEmbeddedChannel extends AbstractChannel {
return state == 1; return state == 1;
} }
public Queue<Object> lastInboundMessageBuffer() { public MessageBuf<Object> lastInboundMessageBuffer() {
return lastInboundMessageBuffer; return lastInboundMessageBuffer;
} }

View File

@ -15,11 +15,11 @@
*/ */
package io.netty.channel.embedded; package io.netty.channel.embedded;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelBufferType; import io.netty.channel.ChannelBufferType;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue;
public class EmbeddedMessageChannel extends AbstractEmbeddedChannel { public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
@ -32,13 +32,13 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
return ChannelBufferType.MESSAGE; return ChannelBufferType.MESSAGE;
} }
public Queue<Object> inboundBuffer() { public MessageBuf<Object> inboundBuffer() {
return pipeline().inboundMessageBuffer(); return pipeline().inboundMessageBuffer();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Queue<Object> lastOutboundBuffer() { public MessageBuf<Object> lastOutboundBuffer() {
return (Queue<Object>) lastOutboundBuffer; return (MessageBuf<Object>) lastOutboundBuffer;
} }
public Object readOutbound() { public Object readOutbound() {
@ -66,13 +66,7 @@ public class EmbeddedMessageChannel extends AbstractEmbeddedChannel {
} }
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
for (;;) { buf.drainTo(lastOutboundBuffer());
Object o = buf.poll();
if (o == null) {
break;
}
lastOutboundBuffer().add(o);
}
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.local; package io.netty.channel.local;
import io.netty.buffer.MessageBuf;
import io.netty.channel.AbstractChannel; import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferType; import io.netty.channel.ChannelBufferType;
@ -31,7 +32,6 @@ import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException; import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
/** /**
* A {@link Channel} for the local transport. * A {@link Channel} for the local transport.
@ -203,7 +203,7 @@ public class LocalChannel extends AbstractChannel {
} }
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
if (state < 2) { if (state < 2) {
throw new NotYetConnectedException(); throw new NotYetConnectedException();
} }
@ -214,14 +214,7 @@ public class LocalChannel extends AbstractChannel {
final LocalChannel peer = this.peer; final LocalChannel peer = this.peer;
assert peer != null; assert peer != null;
Queue<Object> out = peer.pipeline().inboundMessageBuffer(); buf.drainTo(peer.pipeline().inboundMessageBuffer());
for (;;) {
Object msg = buf.poll();
if (msg == null) {
break;
}
out.add(msg);
}
peer.eventLoop().execute(new Runnable() { peer.eventLoop().execute(new Runnable() {
@Override @Override

View File

@ -15,13 +15,13 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferType; import io.netty.channel.ChannelBufferType;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.util.Queue;
abstract class AbstractNioMessageChannel extends AbstractNioChannel { abstract class AbstractNioMessageChannel extends AbstractNioChannel {
@ -46,7 +46,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final Queue<Object> msgBuf = pipeline.inboundMessageBuffer(); final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
try { try {
@ -82,7 +82,7 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
final int writeSpinCount = config().getWriteSpinCount() - 1; final int writeSpinCount = config().getWriteSpinCount() - 1;
while (!buf.isEmpty()) { while (!buf.isEmpty()) {
boolean wrote = false; boolean wrote = false;
@ -100,6 +100,6 @@ abstract class AbstractNioMessageChannel extends AbstractNioChannel {
} }
} }
protected abstract int doReadMessages(Queue<Object> buf) throws Exception; protected abstract int doReadMessages(MessageBuf<Object> buf) throws Exception;
protected abstract int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception; protected abstract int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception;
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -39,7 +40,6 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
/** /**
* Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}. * Provides an NIO based {@link io.netty.channel.socket.DatagramChannel}.
@ -160,7 +160,7 @@ public final class NioDatagramChannel
} }
@Override @Override
protected int doReadMessages(Queue<Object> buf) throws Exception { protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
DatagramChannel ch = javaChannel(); DatagramChannel ch = javaChannel();
ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize()); ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize());
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data);
@ -174,7 +174,7 @@ public final class NioDatagramChannel
} }
@Override @Override
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception { protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
DatagramPacket packet = (DatagramPacket) buf.peek(); DatagramPacket packet = (DatagramPacket) buf.peek();
ByteBuf data = packet.data(); ByteBuf data = packet.data();
ByteBuffer nioData; ByteBuffer nioData;

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.socket.nio; package io.netty.channel.socket.nio;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig;
@ -25,7 +26,6 @@ import java.net.SocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Queue;
public class NioServerSocketChannel extends AbstractNioMessageChannel public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel { implements io.netty.channel.socket.ServerSocketChannel {
@ -84,7 +84,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
} }
@Override @Override
protected int doReadMessages(Queue<Object> buf) throws Exception { protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept(); SocketChannel ch = javaChannel().accept();
if (ch == null) { if (ch == null) {
return 0; return 0;
@ -116,7 +116,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
} }
@Override @Override
protected int doWriteMessages(Queue<Object> buf, boolean lastSpin) throws Exception { protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
} }

View File

@ -15,12 +15,12 @@
*/ */
package io.netty.channel.socket.oio; package io.netty.channel.socket.oio;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferType; import io.netty.channel.ChannelBufferType;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import java.io.IOException; import java.io.IOException;
import java.util.Queue;
abstract class AbstractOioMessageChannel extends AbstractOioChannel { abstract class AbstractOioMessageChannel extends AbstractOioChannel {
@ -44,7 +44,7 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final Queue<Object> msgBuf = pipeline.inboundMessageBuffer(); final MessageBuf<Object> msgBuf = pipeline.inboundMessageBuffer();
boolean closed = false; boolean closed = false;
boolean read = false; boolean read = false;
try { try {
@ -75,12 +75,12 @@ abstract class AbstractOioMessageChannel extends AbstractOioChannel {
} }
@Override @Override
protected void doFlushMessageBuffer(Queue<Object> buf) throws Exception { protected void doFlushMessageBuffer(MessageBuf<Object> buf) throws Exception {
while (!buf.isEmpty()) { while (!buf.isEmpty()) {
doWriteMessages(buf); doWriteMessages(buf);
} }
} }
protected abstract int doReadMessages(Queue<Object> buf) throws Exception; protected abstract int doReadMessages(MessageBuf<Object> buf) throws Exception;
protected abstract void doWriteMessages(Queue<Object> buf) throws Exception; protected abstract void doWriteMessages(MessageBuf<Object> buf) throws Exception;
} }

View File

@ -16,6 +16,7 @@
package io.netty.channel.socket.oio; package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -35,7 +36,6 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Locale; import java.util.Locale;
import java.util.Queue;
public class OioDatagramChannel extends AbstractOioMessageChannel public class OioDatagramChannel extends AbstractOioMessageChannel
implements DatagramChannel { implements DatagramChannel {
@ -148,7 +148,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
} }
@Override @Override
protected int doReadMessages(Queue<Object> buf) throws Exception { protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
int packetSize = config().getReceivePacketSize(); int packetSize = config().getReceivePacketSize();
byte[] data = new byte[packetSize]; byte[] data = new byte[packetSize];
tmpPacket.setData(data); tmpPacket.setData(data);
@ -173,7 +173,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
} }
@Override @Override
protected void doWriteMessages(Queue<Object> buf) throws Exception { protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
DatagramPacket p = (DatagramPacket) buf.poll(); DatagramPacket p = (DatagramPacket) buf.poll();
ByteBuf data = p.data(); ByteBuf data = p.data();
int length = data.readableBytes(); int length = data.readableBytes();

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.channel.socket.oio; package io.netty.channel.socket.oio;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
@ -28,7 +29,6 @@ import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Queue;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -124,7 +124,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
} }
@Override @Override
protected int doReadMessages(Queue<Object> buf) throws Exception { protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
if (socket.isClosed()) { if (socket.isClosed()) {
return -1; return -1;
} }
@ -169,7 +169,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
} }
@Override @Override
protected void doWriteMessages(Queue<Object> buf) throws Exception { protected void doWriteMessages(MessageBuf<Object> buf) throws Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
} }

View File

@ -245,7 +245,7 @@ public class LocalTransportThreadModelTest {
ch.eventLoop().execute(new Runnable() { ch.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
Queue<Object> buf = ch.pipeline().inboundMessageBuffer(); MessageBuf<Object> buf = ch.pipeline().inboundMessageBuffer();
for (int j = start; j < end; j ++) { for (int j = start; j < end; j ++) {
buf.add(Integer.valueOf(j)); buf.add(Integer.valueOf(j));
} }
@ -285,7 +285,7 @@ public class LocalTransportThreadModelTest {
ch.pipeline().context(h6).executor().execute(new Runnable() { ch.pipeline().context(h6).executor().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
Queue<Object> buf = ch.pipeline().outboundMessageBuffer(); MessageBuf<Object> buf = ch.pipeline().outboundMessageBuffer();
for (int j = start; j < end; j ++) { for (int j = start; j < end; j ++) {
buf.add(Integer.valueOf(j)); buf.add(Integer.valueOf(j));
} }
@ -407,7 +407,7 @@ public class LocalTransportThreadModelTest {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
} }
Queue<Integer> in = ctx.inboundMessageBuffer(); MessageBuf<Integer> in = ctx.inboundMessageBuffer();
ByteBuf out = ctx.nextInboundByteBuffer(); ByteBuf out = ctx.nextInboundByteBuffer();
for (;;) { for (;;) {
@ -432,7 +432,7 @@ public class LocalTransportThreadModelTest {
boolean swallow = this == ctx.pipeline().first(); boolean swallow = this == ctx.pipeline().first();
ByteBuf in = ctx.outboundByteBuffer(); ByteBuf in = ctx.outboundByteBuffer();
Queue<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
while (in.readableBytes() >= 4) { while (in.readableBytes() >= 4) {
int msg = in.readInt(); int msg = in.readInt();
int expected = outCnt ++; int expected = outCnt ++;
@ -493,7 +493,7 @@ public class LocalTransportThreadModelTest {
} }
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
Queue<Object> out = ctx.nextInboundMessageBuffer(); MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
while (in.readableBytes() >= 4) { while (in.readableBytes() >= 4) {
int msg = in.readInt(); int msg = in.readInt();
@ -510,7 +510,7 @@ public class LocalTransportThreadModelTest {
ChannelFuture future) throws Exception { ChannelFuture future) throws Exception {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
Queue<Integer> in = ctx.outboundMessageBuffer(); MessageBuf<Integer> in = ctx.outboundMessageBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
for (;;) { for (;;) {
@ -566,8 +566,8 @@ public class LocalTransportThreadModelTest {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
} }
Queue<Object> in = ctx.inboundMessageBuffer(); MessageBuf<Object> in = ctx.inboundMessageBuffer();
Queue<Object> out = ctx.nextInboundMessageBuffer(); MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -586,8 +586,8 @@ public class LocalTransportThreadModelTest {
ChannelFuture future) throws Exception { ChannelFuture future) throws Exception {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
Queue<Object> in = ctx.outboundMessageBuffer(); MessageBuf<Object> in = ctx.outboundMessageBuffer();
Queue<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -642,7 +642,7 @@ public class LocalTransportThreadModelTest {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
} }
Queue<Object> in = ctx.inboundMessageBuffer(); MessageBuf<Object> in = ctx.inboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -658,8 +658,8 @@ public class LocalTransportThreadModelTest {
ChannelFuture future) throws Exception { ChannelFuture future) throws Exception {
Assert.assertSame(t, Thread.currentThread()); Assert.assertSame(t, Thread.currentThread());
Queue<Object> in = ctx.outboundMessageBuffer(); MessageBuf<Object> in = ctx.outboundMessageBuffer();
Queue<Object> out = ctx.nextOutboundMessageBuffer(); MessageBuf<Object> out = ctx.nextOutboundMessageBuffer();
for (;;) { for (;;) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {