diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java index da158fe051..a5e59eef76 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/EmbeddedChannel.java @@ -31,16 +31,15 @@ import java.util.Queue; class EmbeddedChannel extends AbstractChannel { private final ChannelConfig config = new DefaultChannelConfig(); - private final ChannelBufferHolder firstOut; private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); private final Queue productQueue; private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED EmbeddedChannel(Queue productQueue) { - super(null, null); + super(null, null, ChannelBufferHolders.catchAllBuffer( + productQueue, ChannelBuffers.dynamicBuffer())); this.productQueue = productQueue; - firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer()); } @Override @@ -63,12 +62,6 @@ class EmbeddedChannel extends AbstractChannel { return loop instanceof EmbeddedEventLoop; } - @Override - @SuppressWarnings("unchecked") - protected ChannelBufferHolder firstOut() { - return (ChannelBufferHolder) firstOut; - } - @Override protected SocketAddress localAddress0() { return isActive()? localAddress : null; diff --git a/example/src/main/java/io/netty/example/local/LocalExample.java b/example/src/main/java/io/netty/example/local/LocalExample.java deleted file mode 100644 index 6935bc3e1c..0000000000 --- a/example/src/main/java/io/netty/example/local/LocalExample.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.example.local; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; - -import io.netty.bootstrap.ClientBootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.local.DefaultLocalClientChannelFactory; -import io.netty.channel.local.DefaultLocalServerChannelFactory; -import io.netty.channel.local.LocalAddress; -import io.netty.example.echo.EchoServerHandler; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.logging.LoggingHandler; -import io.netty.logging.InternalLogLevel; - -public class LocalExample { - - private final String port; - - public LocalExample(String port) { - this.port = port; - } - - public void run() throws IOException { - // Address to bind on / connect to. - LocalAddress socketAddress = new LocalAddress(port); - - // Configure the server. - ServerBootstrap sb = new ServerBootstrap( - new DefaultLocalServerChannelFactory()); - - // Set up the default server-side event pipeline. - EchoServerHandler handler = new EchoServerHandler(); - sb.pipeline().addLast("handler", handler); - - // Start up the server. - sb.bind(socketAddress); - - // Configure the client. - ClientBootstrap cb = new ClientBootstrap( - new DefaultLocalClientChannelFactory()); - - // Set up the client-side pipeline factory. - cb.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new StringDecoder(), - new StringEncoder(), - new LoggingHandler(InternalLogLevel.INFO)); - } - }); - - // Make the connection attempt to the server. - ChannelFuture channelFuture = cb.connect(socketAddress); - channelFuture.awaitUninterruptibly(); - - // Read commands from the stdin. - System.out.println("Enter text (quit to end)"); - ChannelFuture lastWriteFuture = null; - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - for (; ;) { - String line = in.readLine(); - if (line == null || "quit".equalsIgnoreCase(line)) { - break; - } - - // Sends the received line to the server. - lastWriteFuture = channelFuture.channel().write(line); - } - - // Wait until all messages are flushed before closing the channel. - if (lastWriteFuture != null) { - lastWriteFuture.awaitUninterruptibly(); - } - channelFuture.channel().close(); - - // Wait until the connection is closed or the connection attempt fails. - channelFuture.channel().getCloseFuture().awaitUninterruptibly(); - - // Release all resources used by the local transport. - cb.releaseExternalResources(); - sb.releaseExternalResources(); - } - - public static void main(String[] args) throws Exception { - new LocalExample("1").run(); - } -} diff --git a/example/src/main/java/io/netty/example/local/LocalExampleMultithreaded.java b/example/src/main/java/io/netty/example/local/LocalExampleMultithreaded.java deleted file mode 100644 index bb748a28ff..0000000000 --- a/example/src/main/java/io/netty/example/local/LocalExampleMultithreaded.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.example.local; - -import java.util.concurrent.TimeUnit; - -import io.netty.bootstrap.ClientBootstrap; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.Channels; -import io.netty.channel.local.DefaultLocalClientChannelFactory; -import io.netty.channel.local.DefaultLocalServerChannelFactory; -import io.netty.channel.local.LocalAddress; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor; -import io.netty.handler.logging.LoggingHandler; -import io.netty.logging.InternalLogLevel; - -public class LocalExampleMultithreaded { - - private final String port; - - public LocalExampleMultithreaded(String port) { - this.port = port; - } - - public void run() { - LocalAddress socketAddress = new LocalAddress(port); - - OrderedMemoryAwareThreadPoolExecutor eventExecutor = - new OrderedMemoryAwareThreadPoolExecutor( - 5, 1000000, 10000000, 100, - TimeUnit.MILLISECONDS); - - ServerBootstrap sb = new ServerBootstrap( - new DefaultLocalServerChannelFactory()); - - sb.setPipelineFactory(new LocalServerPipelineFactory(eventExecutor)); - sb.bind(socketAddress); - - ClientBootstrap cb = new ClientBootstrap( - new DefaultLocalClientChannelFactory()); - - cb.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { - return Channels.pipeline( - new StringDecoder(), - new StringEncoder(), - new LoggingHandler(InternalLogLevel.INFO)); - } - }); - - // Read commands from array - String[] commands = { "First", "Second", "Third", "quit" }; - for (int j = 0; j < 5 ; j++) { - System.err.println("Start " + j); - ChannelFuture channelFuture = cb.connect(socketAddress); - channelFuture.awaitUninterruptibly(); - if (! channelFuture.isSuccess()) { - System.err.println("CANNOT CONNECT"); - channelFuture.cause().printStackTrace(); - break; - } - ChannelFuture lastWriteFuture = null; - for (String line: commands) { - // Sends the received line to the server. - lastWriteFuture = channelFuture.channel().write(line); - } - - // Wait until all messages are flushed before closing the channel. - if (lastWriteFuture != null) { - lastWriteFuture.awaitUninterruptibly(); - } - channelFuture.channel().close(); - // Wait until the connection is closed or the connection attempt fails. - channelFuture.channel().getCloseFuture().awaitUninterruptibly(); - System.err.println("End " + j); - } - - // Release all resources - cb.releaseExternalResources(); - sb.releaseExternalResources(); - eventExecutor.shutdownNow(); - } - - public static void main(String[] args) throws Exception { - new LocalExampleMultithreaded("1").run(); - } -} diff --git a/example/src/main/java/io/netty/example/local/LocalServerPipelineFactory.java b/example/src/main/java/io/netty/example/local/LocalServerPipelineFactory.java deleted file mode 100644 index 3c28c5d764..0000000000 --- a/example/src/main/java/io/netty/example/local/LocalServerPipelineFactory.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.example.local; - -import java.util.concurrent.Executor; - -import io.netty.channel.ChannelDownstreamHandler; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.ChannelUpstreamHandler; -import io.netty.channel.Channels; -import io.netty.channel.MessageEvent; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; -import io.netty.handler.execution.ExecutionHandler; - -public class LocalServerPipelineFactory implements ChannelPipelineFactory { - - private final ExecutionHandler executionHandler; - - public LocalServerPipelineFactory(Executor eventExecutor) { - executionHandler = new ExecutionHandler(eventExecutor); - } - - @Override - public ChannelPipeline getPipeline() throws Exception { - final ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("decoder", new StringDecoder()); - pipeline.addLast("encoder", new StringEncoder()); - pipeline.addLast("executor", executionHandler); - pipeline.addLast("handler", new EchoCloseServerHandler()); - return pipeline; - } - - static class EchoCloseServerHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (e instanceof MessageEvent) { - final MessageEvent evt = (MessageEvent) e; - String msg = (String) evt.getMessage(); - if (msg.equalsIgnoreCase("quit")) { - Channels.close(e.getChannel()); - return; - } - } - ctx.sendUpstream(e); - } - - @Override - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) { - if (e instanceof MessageEvent) { - final MessageEvent evt = (MessageEvent) e; - String msg = (String) evt.getMessage(); - if (msg.equalsIgnoreCase("quit")) { - Channels.close(e.getChannel()); - return; - } - System.err.println("SERVER:" + msg); - // Write back - Channels.write(e.getChannel(), msg); - } - ctx.sendDownstream(e); - } - } -} diff --git a/example/src/main/java/io/netty/example/localecho/LocalEcho.java b/example/src/main/java/io/netty/example/localecho/LocalEcho.java new file mode 100644 index 0000000000..31a45e476d --- /dev/null +++ b/example/src/main/java/io/netty/example/localecho/LocalEcho.java @@ -0,0 +1,114 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.example.localecho; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; +import io.netty.channel.socket.nio.NioEventLoop; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.io.BufferedReader; +import java.io.InputStreamReader; + +public class LocalEcho { + + private final String port; + + public LocalEcho(String port) { + this.port = port; + } + + public void run() throws Exception { + // Address to bind on / connect to. + final LocalAddress addr = new LocalAddress(port); + + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + try { + // Note that we can use any event loop so that you can ensure certain local channels + // are handled by the same event loop thread which drives a certain socket channel. + sb.eventLoop(new NioEventLoop(), new NioEventLoop()) + .channel(new LocalServerChannel()) + .localAddress(addr) + .initializer(new ChannelInitializer() { + @Override + public void initChannel(LocalServerChannel ch) throws Exception { + ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); + } + }) + .childInitializer(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast( + new LoggingHandler(LogLevel.INFO), + new LocalEchoServerHandler()); + } + }); + + cb.eventLoop(new NioEventLoop()) + .channel(new LocalChannel()) + .remoteAddress(addr) + .initializer(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast( + new LoggingHandler(LogLevel.INFO), + new LocalEchoClientHandler()); + } + + }); + + Channel sch = sb.bind().sync().channel(); + Channel ch = cb.connect().sync().channel(); + + // Read commands from the stdin. + System.out.println("Enter text (quit to end)"); + ChannelFuture lastWriteFuture = null; + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + for (; ;) { + String line = in.readLine(); + if (line == null || "quit".equalsIgnoreCase(line)) { + break; + } + + // Sends the received line to the server. + lastWriteFuture = ch.write(line); + } + + // Wait until all messages are flushed before closing the channel. + if (lastWriteFuture != null) { + lastWriteFuture.awaitUninterruptibly(); + } + + ch.close().sync(); + sch.close().sync(); + } finally { + sb.shutdown(); + cb.shutdown(); + } + } + + public static void main(String[] args) throws Exception { + new LocalEcho("1").run(); + } +} diff --git a/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java new file mode 100644 index 0000000000..429d018b0d --- /dev/null +++ b/example/src/main/java/io/netty/example/localecho/LocalEchoClientHandler.java @@ -0,0 +1,19 @@ +package io.netty.example.localecho; + +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; + +public class LocalEchoClientHandler extends ChannelInboundMessageHandlerAdapter { + + @Override + public void messageReceived(ChannelInboundHandlerContext ctx, String msg) { + // Print as received + System.out.println(msg); + } + + @Override + public void exceptionCaught(ChannelInboundHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java b/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java new file mode 100644 index 0000000000..fb10b88b3e --- /dev/null +++ b/example/src/main/java/io/netty/example/localecho/LocalEchoServerHandler.java @@ -0,0 +1,19 @@ +package io.netty.example.localecho; + +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; + +public class LocalEchoServerHandler extends ChannelInboundMessageHandlerAdapter { + + @Override + public void messageReceived(ChannelInboundHandlerContext ctx, String msg) { + // Write back as received + ctx.write(msg); + } + + @Override + public void exceptionCaught(ChannelInboundHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c7d15f6945..8fb4cd8d2a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -80,7 +80,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile EventLoop eventLoop; private volatile boolean registered; - private final ChannelBufferHolder outbound; + private final ChannelBufferHolder directOutbound; private ClosedChannelException closedChannelException; private final Deque flushCheckpoints = new ArrayDeque(); private long writeCounter; @@ -102,7 +102,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha * the parent of this channel. {@code null} if there's no parent. */ @SuppressWarnings("unchecked") - protected AbstractChannel(Channel parent, Integer id) { + protected AbstractChannel(Channel parent, Integer id, ChannelBufferHolder outboundBuffer) { + if (outboundBuffer == null) { + throw new NullPointerException("outboundBuffer"); + } + if (id == null) { id = allocateId(this); } else { @@ -117,7 +121,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.parent = parent; this.id = id; unsafe = newUnsafe(); - outbound = (ChannelBufferHolder) newOutboundBuffer(); + directOutbound = (ChannelBufferHolder) outboundBuffer; closeFuture().addListener(new ChannelFutureListener() { @Override @@ -370,27 +374,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private final Runnable flushLaterTask = new FlushLater(); @Override - public ChannelBufferHolder directOutbound() { - return outbound; + public final ChannelBufferHolder directOutbound() { + return directOutbound; } @Override - public ChannelFuture voidFuture() { + public final ChannelFuture voidFuture() { return voidFuture; } @Override - public SocketAddress localAddress() { + public final SocketAddress localAddress() { return localAddress0(); } @Override - public SocketAddress remoteAddress() { + public final SocketAddress remoteAddress() { return remoteAddress0(); } @Override - public void register(EventLoop eventLoop, ChannelFuture future) { + public final void register(EventLoop eventLoop, ChannelFuture future) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } @@ -432,7 +436,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void bind(final SocketAddress localAddress, final ChannelFuture future) { + public final void bind(final SocketAddress localAddress, final ChannelFuture future) { if (eventLoop().inEventLoop()) { if (!ensureOpen(future)) { return; @@ -461,7 +465,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void disconnect(final ChannelFuture future) { + public final void disconnect(final ChannelFuture future) { if (eventLoop().inEventLoop()) { try { boolean wasActive = isActive(); @@ -485,7 +489,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void close(final ChannelFuture future) { + public final void close(final ChannelFuture future) { if (eventLoop().inEventLoop()) { if (closeFuture.setClosed()) { boolean wasActive = isActive(); @@ -522,16 +526,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void deregister(final ChannelFuture future) { + public final void deregister(final ChannelFuture future) { if (eventLoop().inEventLoop()) { + if (!registered) { + future.setSuccess(); + return; + } + try { doDeregister(); } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { - registered = false; - future.setSuccess(); - pipeline().fireChannelUnregistered(); + if (registered) { + registered = false; + future.setSuccess(); + pipeline().fireChannelUnregistered(); + } else { + // Some transports like local and AIO does not allow the deregistration of + // an open channel. Their doDeregister() calls close(). Consequently, + // close() calls deregister() again - no need to fire channelUnregistered. + future.setSuccess(); + } } } else { eventLoop().execute(new Runnable() { @@ -594,7 +610,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void flushNow() { + public final void flushNow() { if (inFlushNow) { return; } @@ -630,7 +646,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - protected boolean ensureOpen(ChannelFuture future) { + protected final boolean ensureOpen(ChannelFuture future) { if (isOpen()) { return true; } @@ -641,7 +657,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return false; } - protected void closeIfClosed() { + protected final void closeIfClosed() { if (isOpen()) { return; } @@ -659,8 +675,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract boolean isCompatible(EventLoop loop); - protected abstract ChannelBufferHolder newOutboundBuffer(); - protected abstract SocketAddress localAddress0(); protected abstract SocketAddress remoteAddress0(); diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 0d9b8cac86..8020ca8840 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -33,7 +33,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S * Creates a new instance. */ protected AbstractServerChannel(Integer id) { - super(null, id); + super(null, id, ChannelBufferHolders.discardBuffer()); } @Override @@ -47,8 +47,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.discardBuffer(); + protected Unsafe newUnsafe() { + return new DefaultServerUnsafe(); } @Override @@ -60,4 +60,53 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean isFlushPending() { + return false; + } + + protected class DefaultServerUnsafe extends AbstractUnsafe { + + @Override + public void flush(final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + reject(future); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + flush(future); + } + }); + } + } + + @Override + public void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, + final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + reject(future); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress, future); + } + }); + } + } + + private void reject(ChannelFuture future) { + Exception cause = new UnsupportedOperationException(); + future.setFailure(cause); + pipeline().fireExceptionCaught(cause); + } + } } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 7048464a5f..4efdb10ffb 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -44,6 +44,12 @@ public final class ChannelBufferHolder { } ChannelBufferHolder(Queue msgBuf, ChannelBuffer byteBuf) { + if (msgBuf == null) { + throw new NullPointerException("msgBuf"); + } + if (byteBuf == null) { + throw new NullPointerException("byteBuf"); + } ctx = null; bypassDirection = 0; this.msgBuf = msgBuf; diff --git a/transport/src/main/java/io/netty/channel/local/DefaultLocalChannel.java b/transport/src/main/java/io/netty/channel/local/DefaultLocalChannel.java deleted file mode 100644 index 0507ea959a..0000000000 --- a/transport/src/main/java/io/netty/channel/local/DefaultLocalChannel.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import static io.netty.channel.Channels.*; - -import java.nio.channels.ClosedChannelException; -import java.nio.channels.NotYetConnectedException; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; - -import io.netty.channel.AbstractChannel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.MessageEvent; -import io.netty.util.internal.QueueFactory; -import io.netty.util.internal.ThreadLocalBoolean; - -/** - */ -final class DefaultLocalChannel extends AbstractChannel implements LocalChannel { - - // TODO Move the state management up to AbstractChannel to remove duplication. - private static final int ST_OPEN = 0; - private static final int ST_BOUND = 1; - private static final int ST_CONNECTED = 2; - private static final int ST_CLOSED = -1; - final AtomicInteger state = new AtomicInteger(ST_OPEN); - - private final ChannelConfig config; - private final ThreadLocalBoolean delivering = new ThreadLocalBoolean(); - - final Queue writeBuffer = QueueFactory.createQueue(MessageEvent.class); - - volatile DefaultLocalChannel pairedChannel; - volatile LocalAddress localAddress; - volatile LocalAddress remoteAddress; - - static DefaultLocalChannel create(LocalServerChannel parent, - ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, - DefaultLocalChannel pairedChannel) { - DefaultLocalChannel instance = new DefaultLocalChannel(parent, factory, pipeline, sink, - pairedChannel); - fireChannelOpen(instance); - return instance; - } - - private DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) { - super(parent, factory, pipeline, sink); - this.pairedChannel = pairedChannel; - config = new DefaultChannelConfig(); - - // TODO Move the state variable to AbstractChannel so that we don't need - // to add many listeners. - getCloseFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - state.set(ST_CLOSED); - } - }); - - } - - @Override - public ChannelConfig getConfig() { - return config; - } - - @Override - public boolean isOpen() { - return state.get() >= ST_OPEN; - } - - @Override - public boolean isBound() { - return state.get() >= ST_BOUND; - } - - @Override - public boolean isConnected() { - return state.get() == ST_CONNECTED; - } - - void setBound() throws ClosedChannelException { - if (!state.compareAndSet(ST_OPEN, ST_BOUND)) { - switch (state.get()) { - case ST_CLOSED: - throw new ClosedChannelException(); - default: - throw new ChannelException("already bound"); - } - } - } - - void setConnected() { - if (state.get() != ST_CLOSED) { - state.set(ST_CONNECTED); - } - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } - - @Override - public LocalAddress getLocalAddress() { - return localAddress; - } - - @Override - public LocalAddress getRemoteAddress() { - return remoteAddress; - } - - void closeNow(ChannelFuture future) { - LocalAddress localAddress = this.localAddress; - try { - // Close the self. - if (!setClosed()) { - return; - } - - DefaultLocalChannel pairedChannel = this.pairedChannel; - if (pairedChannel != null) { - this.pairedChannel = null; - fireChannelDisconnected(this); - fireChannelUnbound(this); - } - fireChannelClosed(this); - - // Close the peer. - if (pairedChannel == null || !pairedChannel.setClosed()) { - return; - } - - DefaultLocalChannel me = pairedChannel.pairedChannel; - if (me != null) { - pairedChannel.pairedChannel = null; - fireChannelDisconnected(pairedChannel); - fireChannelUnbound(pairedChannel); - } - fireChannelClosed(pairedChannel); - } finally { - future.setSuccess(); - if (localAddress != null && getParent() == null) { - LocalChannelRegistry.unregister(localAddress); - } - } - } - - void flushWriteBuffer() { - DefaultLocalChannel pairedChannel = this.pairedChannel; - if (pairedChannel != null) { - if (pairedChannel.isConnected()) { - // Channel is open and connected and channelConnected event has - // been fired. - if (!delivering.get()) { - delivering.set(true); - try { - for (;;) { - MessageEvent e = writeBuffer.poll(); - if (e == null) { - break; - } - - e.getFuture().setSuccess(); - fireMessageReceived(pairedChannel, e.getMessage()); - fireWriteComplete(this, 1); - } - } finally { - delivering.set(false); - } - } - } else { - // Channel is open and connected but channelConnected event has - // not been fired yet. - } - } else { - // Channel is closed or not connected yet - notify as failures. - Exception cause; - if (isOpen()) { - cause = new NotYetConnectedException(); - } else { - cause = new ClosedChannelException(); - } - - for (;;) { - MessageEvent e = writeBuffer.poll(); - if (e == null) { - break; - } - - e.getFuture().setFailure(cause); - fireExceptionCaught(this, cause); - } - } - } -} diff --git a/transport/src/main/java/io/netty/channel/local/DefaultLocalClientChannelFactory.java b/transport/src/main/java/io/netty/channel/local/DefaultLocalClientChannelFactory.java deleted file mode 100644 index 61524cc16b..0000000000 --- a/transport/src/main/java/io/netty/channel/local/DefaultLocalClientChannelFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; - -/** - * The default {@link LocalClientChannelFactory} implementation. - * @apiviz.landmark - */ -public class DefaultLocalClientChannelFactory implements LocalClientChannelFactory { - - private final ChannelSink sink; - - /** - * Creates a new instance. - */ - public DefaultLocalClientChannelFactory() { - sink = new LocalClientChannelSink(); - } - - @Override - public LocalChannel newChannel(ChannelPipeline pipeline) { - return DefaultLocalChannel.create(null, this, pipeline, sink, null); - } - - /** - * Does nothing because this implementation does not require any external - * resources. - */ - @Override - public void releaseExternalResources() { - // No external resources. - } -} diff --git a/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannel.java deleted file mode 100644 index 592ea2ddb2..0000000000 --- a/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannel.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import static io.netty.channel.Channels.*; - -import java.util.concurrent.atomic.AtomicBoolean; - -import io.netty.channel.AbstractServerChannel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.DefaultServerChannelConfig; - -/** - */ -final class DefaultLocalServerChannel extends AbstractServerChannel implements - LocalServerChannel { - - final ChannelConfig channelConfig; - - final AtomicBoolean bound = new AtomicBoolean(); - - volatile LocalAddress localAddress; - - static DefaultLocalServerChannel create(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink) { - DefaultLocalServerChannel instance = - new DefaultLocalServerChannel(factory, pipeline, sink); - fireChannelOpen(instance); - return instance; - } - - private DefaultLocalServerChannel(ChannelFactory factory, - ChannelPipeline pipeline, ChannelSink sink) { - super(factory, pipeline, sink); - channelConfig = new DefaultServerChannelConfig(); - } - - @Override - public ChannelConfig getConfig() { - return channelConfig; - } - - @Override - public boolean isBound() { - return isOpen() && bound.get(); - } - - @Override - public LocalAddress getLocalAddress() { - return isBound() ? localAddress : null; - } - - @Override - public LocalAddress getRemoteAddress() { - return null; - } - - @Override - protected boolean setClosed() { - return super.setClosed(); - } -} diff --git a/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannelFactory.java b/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannelFactory.java deleted file mode 100644 index 7df60dcd27..0000000000 --- a/transport/src/main/java/io/netty/channel/local/DefaultLocalServerChannelFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelSink; -import io.netty.channel.group.DefaultChannelGroup; - -/** - * The default {@link LocalServerChannelFactory} implementation. - * @apiviz.landmark - */ -public class DefaultLocalServerChannelFactory implements LocalServerChannelFactory { - - private final DefaultChannelGroup group = new DefaultChannelGroup(); - - private final ChannelSink sink = new LocalServerChannelSink(); - - @Override - public LocalServerChannel newChannel(ChannelPipeline pipeline) { - LocalServerChannel channel = DefaultLocalServerChannel.create(this, pipeline, sink); - group.add(channel); - return channel; - } - - /** - * Release all the previous created channels. This takes care of calling {@link LocalChannelRegistry#unregister(LocalAddress)} - * for each if them. - */ - @Override - public void releaseExternalResources() { - group.close().awaitUninterruptibly(); - } -} diff --git a/transport/src/main/java/io/netty/channel/local/LocalAddress.java b/transport/src/main/java/io/netty/channel/local/LocalAddress.java index 1853690d82..d057ae5fc1 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalAddress.java +++ b/transport/src/main/java/io/netty/channel/local/LocalAddress.java @@ -15,34 +15,37 @@ */ package io.netty.channel.local; +import io.netty.channel.Channel; + import java.net.SocketAddress; /** * An endpoint in the local transport. Each endpoint is identified by a unique - * case-insensitive string, except for the pre-defined value called - * {@code "ephemeral"}. + * case-insensitive string. * - *

Ephemeral Address

- * - * An ephemeral address is an anonymous address which is assigned temporarily - * and is released as soon as the connection is closed. All ephemeral addresses - * have the same ID, {@code "ephemeral"}, but they are not equal to each other. * @apiviz.landmark */ public final class LocalAddress extends SocketAddress implements Comparable { - private static final long serialVersionUID = -3601961747680808645L; + private static final long serialVersionUID = 4644331421130916435L; - public static final String EPHEMERAL = "ephemeral"; + public static final LocalAddress ANY = new LocalAddress("ANY"); private final String id; - private final boolean ephemeral; + private final String strVal; /** - * Creates a new instance with the specified ID. + * Creates a new ephemeral port based on the ID of the specified channel. + * Note that we prepend an upper-case character so that it never conflicts with + * the addresses created by a user, which are always lower-cased on construction time. */ - public LocalAddress(int id) { - this(String.valueOf(id)); + LocalAddress(Channel channel) { + StringBuilder buf = new StringBuilder(16); + buf.append("local:E"); + buf.append(Long.toHexString(channel.id().intValue() & 0xFFFFFFFFL | 0x100000000L)); + buf.setCharAt(7, ':'); + id = buf.substring(6); + strVal = buf.toString(); } /** @@ -57,30 +60,19 @@ public final class LocalAddress extends SocketAddress implements Comparable b) { - return 1; - } else { - throw new Error( - "Two different ephemeral addresses have " + - "same identityHashCode."); - } - } else { - return 1; - } - } else { - if (o.ephemeral) { - return -1; - } else { - return getId().compareTo(o.getId()); - } - } + return id.compareTo(o.id); } @Override public String toString() { - return "local:" + getId(); + return strVal; } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 0859f224de..166af54e21 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -15,14 +15,252 @@ */ package io.netty.channel.local; +import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; +import io.netty.channel.SingleThreadEventLoop; + +import java.net.SocketAddress; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ConnectionPendingException; +import java.nio.channels.NotYetConnectedException; +import java.util.Queue; /** * A {@link Channel} for the local transport. */ -public interface LocalChannel extends Channel { +public class LocalChannel extends AbstractChannel { + + private final ChannelConfig config = new DefaultChannelConfig(); + + private volatile int state; // 0 - open, 1 - bound, 2 - connected, 3 - closed + private volatile LocalChannel peer; + private volatile LocalAddress localAddress; + private volatile LocalAddress remoteAddress; + private volatile ChannelFuture connectFuture; + + public LocalChannel() { + this(null); + } + + public LocalChannel(Integer id) { + super(null, id, ChannelBufferHolders.messageBuffer()); + } + + LocalChannel(LocalServerChannel parent, LocalChannel peer) { + super(parent, null, ChannelBufferHolders.messageBuffer()); + this.peer = peer; + localAddress = parent.localAddress(); + remoteAddress = peer.localAddress(); + } + @Override - LocalAddress getLocalAddress(); + public ChannelConfig config() { + return config; + } + @Override - LocalAddress getRemoteAddress(); + public LocalServerChannel parent() { + return (LocalServerChannel) super.parent(); + } + + @Override + public LocalAddress localAddress() { + return (LocalAddress) super.localAddress(); + } + + @Override + public LocalAddress remoteAddress() { + return (LocalAddress) super.remoteAddress(); + } + + @Override + public boolean isOpen() { + return state < 3; + } + + @Override + public boolean isActive() { + return state == 2; + } + + @Override + protected Unsafe newUnsafe() { + return new LocalUnsafe(); + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof SingleThreadEventLoop; + } + + @Override + protected SocketAddress localAddress0() { + return localAddress; + } + + @Override + protected SocketAddress remoteAddress0() { + return remoteAddress; + } + + @Override + protected void doRegister() throws Exception { + if (peer != null) { + state = 2; + + peer.remoteAddress = parent().localAddress(); + peer.state = 2; + peer.eventLoop().execute(new Runnable() { + @Override + public void run() { + peer.connectFuture.setSuccess(); + peer.pipeline().fireChannelActive(); + } + }); + } + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + this.localAddress = + LocalChannelRegistry.register(this, this.localAddress, + localAddress); + state = 1; + } + + @Override + protected void doDisconnect() throws Exception { + doClose(); + } + + @Override + protected void doClose() throws Exception { + if (state > 2) { + // Closed already + return; + } + + LocalChannelRegistry.unregister(localAddress); + localAddress = null; + state = 3; + if (peer.isActive()) { + peer.unsafe().close(peer.unsafe().voidFuture()); + peer = null; + } + } + + @Override + protected void doDeregister() throws Exception { + if (isOpen()) { + unsafe().close(unsafe().voidFuture()); + } + } + + @Override + protected void doFlush(ChannelBufferHolder buf) throws Exception { + if (state < 2) { + throw new NotYetConnectedException(); + } + if (state > 2) { + throw new ClosedChannelException(); + } + + final LocalChannel peer = this.peer; + assert peer != null; + + Queue in = buf.messageBuffer(); + Queue out = peer.pipeline().inbound().messageBuffer(); + for (;;) { + Object msg = in.poll(); + if (msg == null) { + break; + } + out.add(msg); + } + + peer.eventLoop().execute(new Runnable() { + @Override + public void run() { + peer.pipeline().fireInboundBufferUpdated(); + } + }); + } + + @Override + protected boolean isFlushPending() { + return false; + } + + private class LocalUnsafe extends AbstractUnsafe { + + @Override + public void connect(final SocketAddress remoteAddress, + SocketAddress localAddress, final ChannelFuture future) { + if (eventLoop().inEventLoop()) { + if (!ensureOpen(future)) { + return; + } + + if (state == 2) { + Exception cause = new AlreadyConnectedException(); + future.setFailure(cause); + pipeline().fireExceptionCaught(cause); + return; + } + + if (connectFuture != null) { + throw new ConnectionPendingException(); + } + + connectFuture = future; + + if (state != 1) { + // Not bound yet and no localAddress specified - get one. + if (localAddress == null) { + localAddress = new LocalAddress(LocalChannel.this); + } + } + + if (localAddress != null) { + try { + doBind(localAddress); + } catch (Throwable t) { + future.setFailure(t); + pipeline().fireExceptionCaught(t); + close(voidFuture()); + return; + } + } + + Channel boundChannel = LocalChannelRegistry.get(remoteAddress); + if (!(boundChannel instanceof LocalServerChannel)) { + Exception cause = + new ChannelException("connection refused"); + future.setFailure(cause); + pipeline().fireExceptionCaught(cause); + close(voidFuture()); + return; + } + + LocalServerChannel serverChannel = (LocalServerChannel) boundChannel; + peer = serverChannel.serve(LocalChannel.this); + } else { + final SocketAddress localAddress0 = localAddress; + eventLoop().execute(new Runnable() { + @Override + public void run() { + connect(remoteAddress, localAddress0, future); + } + }); + } + } + } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java b/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java index e689d5db76..06924d5ee3 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannelRegistry.java @@ -1,46 +1,45 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ package io.netty.channel.local; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -/** - */ final class LocalChannelRegistry { - private static final ConcurrentMap map = - new ConcurrentHashMap(); + private static final ConcurrentMap boundChannels = + new ConcurrentHashMap(); - static boolean isRegistered(LocalAddress address) { - return map.containsKey(address); + static LocalAddress register( + Channel channel, LocalAddress oldLocalAddress, SocketAddress localAddress) { + if (oldLocalAddress != null) { + throw new ChannelException("already bound"); + } + if (!(localAddress instanceof LocalAddress)) { + throw new ChannelException( + "unsupported address type: " + localAddress.getClass().getSimpleName()); + } + + LocalAddress addr = (LocalAddress) localAddress; + if (LocalAddress.ANY.equals(addr)) { + addr = new LocalAddress(channel); + } + + Channel boundChannel = boundChannels.putIfAbsent(addr, channel); + if (boundChannel != null) { + throw new ChannelException("address already in use by: " + boundChannel); + } + return addr; } - static Channel getChannel(LocalAddress address) { - return map.get(address); + static Channel get(SocketAddress localAddress) { + return boundChannels.get(localAddress); } - static boolean register(LocalAddress address, Channel channel) { - return map.putIfAbsent(address, channel) == null; - } - - static boolean unregister(LocalAddress address) { - return map.remove(address) != null; + static void unregister(LocalAddress localAddress) { + boundChannels.remove(localAddress); } private LocalChannelRegistry() { diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelFactory.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelFactory.java deleted file mode 100644 index fe21844823..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelPipeline; - -/** - * A {@link ChannelFactory} that creates a client-side {@link LocalChannel}. - */ -public interface LocalClientChannelFactory extends ChannelFactory { - @Override - LocalChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java deleted file mode 100644 index 8fc49859ca..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import static io.netty.channel.Channels.*; - -import java.io.IOException; -import java.net.ConnectException; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -/** - */ -final class LocalClientChannelSink extends AbstractChannelSink { - - private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class); - - LocalClientChannelSink() { - } - - @Override - public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent event = (ChannelStateEvent) e; - - DefaultLocalChannel channel = - (DefaultLocalChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.closeNow(future); - } - break; - case BOUND: - if (value != null) { - bind(channel, future, (LocalAddress) value); - } else { - channel.closeNow(future); - } - break; - case CONNECTED: - if (value != null) { - connect(channel, future, (LocalAddress) value); - } else { - channel.closeNow(future); - } - break; - case INTEREST_OPS: - // Unsupported - discard silently. - future.setSuccess(); - break; - } - } else if (e instanceof MessageEvent) { - MessageEvent event = (MessageEvent) e; - DefaultLocalChannel channel = (DefaultLocalChannel) event.channel(); - boolean offered = channel.writeBuffer.offer(event); - assert offered; - channel.flushWriteBuffer(); - } - } - - private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { - try { - if (!LocalChannelRegistry.register(localAddress, channel)) { - throw new ChannelException("address already in use: " + localAddress); - } - - channel.setBound(); - channel.localAddress = localAddress; - future.setSuccess(); - fireChannelBound(channel, localAddress); - } catch (Throwable t) { - LocalChannelRegistry.unregister(localAddress); - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) { - Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress); - if (!(remoteChannel instanceof DefaultLocalServerChannel)) { - future.setFailure(new ConnectException("connection refused")); - return; - } - - DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel; - ChannelPipeline pipeline; - try { - pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline(); - } catch (Exception e) { - future.setFailure(e); - fireExceptionCaught(channel, e); - if (logger.isWarnEnabled()) { - logger.warn( - "Failed to initialize an accepted socket.", e); - } - return; - } - - future.setSuccess(); - DefaultLocalChannel acceptedChannel = DefaultLocalChannel.create(serverChannel, serverChannel.getFactory(), pipeline, this, channel); - channel.pairedChannel = acceptedChannel; - - bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL)); - channel.remoteAddress = serverChannel.getLocalAddress(); - channel.setConnected(); - fireChannelConnected(channel, serverChannel.getLocalAddress()); - - acceptedChannel.localAddress = serverChannel.getLocalAddress(); - try { - acceptedChannel.setBound(); - } catch (IOException e) { - throw new Error(e); - } - fireChannelBound(acceptedChannel, channel.getRemoteAddress()); - acceptedChannel.remoteAddress = channel.getLocalAddress(); - acceptedChannel.setConnected(); - fireChannelConnected(acceptedChannel, channel.getLocalAddress()); - - // Flush something that was written in channelBound / channelConnected - channel.flushWriteBuffer(); - acceptedChannel.flushWriteBuffer(); - } -} diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 90648d0e6b..bfdb227e7a 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -15,14 +15,113 @@ */ package io.netty.channel.local; +import io.netty.channel.AbstractServerChannel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.EventLoop; import io.netty.channel.ServerChannel; +import io.netty.channel.SingleThreadEventLoop; + +import java.net.SocketAddress; /** * A {@link ServerChannel} for the local transport. */ -public interface LocalServerChannel extends ServerChannel { +public class LocalServerChannel extends AbstractServerChannel { + + private final ChannelConfig config = new DefaultChannelConfig(); + + private volatile int state; // 0 - open, 1 - active, 2 - closed + private volatile LocalAddress localAddress; + + public LocalServerChannel() { + this(null); + } + + public LocalServerChannel(Integer id) { + super(id); + } + @Override - LocalAddress getLocalAddress(); + public ChannelConfig config() { + return config; + } + @Override - LocalAddress getRemoteAddress(); + public LocalAddress localAddress() { + return (LocalAddress) super.localAddress(); + } + + @Override + public LocalAddress remoteAddress() { + return (LocalAddress) super.remoteAddress(); + } + + @Override + public boolean isOpen() { + return state < 2; + } + + @Override + public boolean isActive() { + return state == 1; + } + + @Override + protected boolean isCompatible(EventLoop loop) { + return loop instanceof SingleThreadEventLoop; + } + + @Override + protected SocketAddress localAddress0() { + return localAddress; + } + + @Override + protected void doRegister() throws Exception { + // NOOP + } + + @Override + protected void doBind(SocketAddress localAddress) throws Exception { + this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress); + state = 1; + } + + @Override + protected void doClose() throws Exception { + if (state > 1) { + // Closed already. + return; + } + + LocalChannelRegistry.unregister(localAddress); + localAddress = null; + state = 2; + } + + @Override + protected void doDeregister() throws Exception { + // NOOP + } + + LocalChannel serve(final LocalChannel peer) { + LocalChannel child = new LocalChannel(this, peer); + serve0(child); + return child; + } + + private void serve0(final LocalChannel child) { + if (eventLoop().inEventLoop()) { + pipeline().inbound().messageBuffer().add(child); + pipeline().fireInboundBufferUpdated(); + } else { + eventLoop().execute(new Runnable() { + @Override + public void run() { + serve0(child); + } + }); + } + } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelFactory.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelFactory.java deleted file mode 100644 index 3e96324867..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ServerChannelFactory; - -/** - * A {@link ServerChannelFactory} that creates a {@link LocalServerChannel}. - */ -public interface LocalServerChannelFactory extends ServerChannelFactory { - @Override - LocalServerChannel newChannel(ChannelPipeline pipeline); -} diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java deleted file mode 100644 index 537aad2646..0000000000 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.local; - -import static io.netty.channel.Channels.*; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelState; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.MessageEvent; - -final class LocalServerChannelSink extends AbstractChannelSink { - - LocalServerChannelSink() { - } - - @Override - public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - Channel channel = e.getChannel(); - if (channel instanceof DefaultLocalServerChannel) { - handleServerChannel(e); - } else if (channel instanceof DefaultLocalChannel) { - handleAcceptedChannel(e); - } - } - - private void handleServerChannel(ChannelEvent e) { - if (!(e instanceof ChannelStateEvent)) { - return; - } - - ChannelStateEvent event = (ChannelStateEvent) e; - DefaultLocalServerChannel channel = - (DefaultLocalServerChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - close(channel, future); - } - break; - case BOUND: - if (value != null) { - bind(channel, future, (LocalAddress) value); - } else { - close(channel, future); - } - break; - } - } - - private void handleAcceptedChannel(ChannelEvent e) { - if (e instanceof ChannelStateEvent) { - ChannelStateEvent event = (ChannelStateEvent) e; - DefaultLocalChannel channel = (DefaultLocalChannel) event.channel(); - ChannelFuture future = event.getFuture(); - ChannelState state = event.getState(); - Object value = event.getValue(); - - switch (state) { - case OPEN: - if (Boolean.FALSE.equals(value)) { - channel.closeNow(future); - } - break; - case BOUND: - case CONNECTED: - if (value == null) { - channel.closeNow(future); - } - break; - case INTEREST_OPS: - // Unsupported - discard silently. - future.setSuccess(); - break; - } - } else if (e instanceof MessageEvent) { - MessageEvent event = (MessageEvent) e; - DefaultLocalChannel channel = (DefaultLocalChannel) event.channel(); - boolean offered = channel.writeBuffer.offer(event); - assert offered; - channel.flushWriteBuffer(); - } - } - - private void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) { - try { - if (!LocalChannelRegistry.register(localAddress, channel)) { - throw new ChannelException("address already in use: " + localAddress); - } - if (!channel.bound.compareAndSet(false, true)) { - throw new ChannelException("already bound"); - } - - channel.localAddress = localAddress; - future.setSuccess(); - fireChannelBound(channel, localAddress); - } catch (Throwable t) { - LocalChannelRegistry.unregister(localAddress); - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } - - private void close(DefaultLocalServerChannel channel, ChannelFuture future) { - try { - if (channel.setClosed()) { - future.setSuccess(); - LocalAddress localAddress = channel.localAddress; - if (channel.bound.compareAndSet(true, false)) { - channel.localAddress = null; - LocalChannelRegistry.unregister(localAddress); - fireChannelUnbound(channel); - } - fireChannelClosed(channel); - } else { - future.setSuccess(); - } - } catch (Throwable t) { - future.setFailure(t); - fireExceptionCaught(channel, t); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 7252280ea2..87d57d1191 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; @@ -49,8 +50,10 @@ public abstract class AbstractNioChannel extends AbstractChannel { private ScheduledFuture connectTimeoutFuture; private ConnectException connectTimeoutException; - protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) { - super(parent, id); + protected AbstractNioChannel( + Channel parent, Integer id, ChannelBufferHolder outboundBuffer, + SelectableChannel ch, int defaultInterestOps) { + super(parent, id, outboundBuffer); this.ch = ch; this.defaultInterestOps = defaultInterestOps; try { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java index 29748c32e6..8b74ec9257 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioMessageChannel.java @@ -2,7 +2,6 @@ package io.netty.channel.socket.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelPipeline; import java.io.IOException; @@ -12,13 +11,9 @@ import java.util.Queue; abstract class AbstractNioMessageChannel extends AbstractNioChannel { protected AbstractNioMessageChannel( - Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) { - super(parent, id, ch, defaultInterestOps); - } - - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.messageBuffer(); + Channel parent, Integer id, ChannelBufferHolder outboundBuffer, + SelectableChannel ch, int defaultInterestOps) { + super(parent, id, outboundBuffer, ch, defaultInterestOps); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java index 68c4aeac64..353cdeb6d5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioStreamChannel.java @@ -14,12 +14,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel { protected AbstractNioStreamChannel( Channel parent, Integer id, SelectableChannel ch) { - super(parent, id, ch, SelectionKey.OP_READ); - } - - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.byteBuffer(); + super(parent, id, ChannelBufferHolders.byteBuffer(), ch, SelectionKey.OP_READ); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 952423f464..5441829ebe 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -67,7 +66,7 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen } public NioDatagramChannel(Integer id, DatagramChannel socket) { - super(null, id, socket, SelectionKey.OP_READ); + super(null, id, ChannelBufferHolders.messageBuffer(), socket, SelectionKey.OP_READ); config = new NioDatagramChannelConfig(socket); } @@ -87,11 +86,6 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen return (DatagramChannel) super.javaChannel(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.messageBuffer(); - } - @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index e4ec9e1f19..958cea0e9c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -15,7 +15,6 @@ */ package io.netty.channel.socket.nio; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultServerSocketChannelConfig; @@ -43,7 +42,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel private final ServerSocketChannelConfig config; public NioServerSocketChannel() { - super(null, null, newSocket(), SelectionKey.OP_ACCEPT); + super(null, null, ChannelBufferHolders.discardBuffer(), newSocket(), SelectionKey.OP_ACCEPT); config = new DefaultServerSocketChannelConfig(javaChannel().socket()); } @@ -106,11 +105,6 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel throw new UnsupportedOperationException(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.discardBuffer(); - } - @Override protected SocketAddress remoteAddress0() { return null; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 2ee77cca43..517d20a763 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -17,8 +17,6 @@ package io.netty.channel.socket.nio; import io.netty.buffer.ChannelBuffer; import io.netty.channel.Channel; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig; @@ -89,11 +87,6 @@ public class NioSocketChannel extends AbstractNioStreamChannel implements io.net return ch.isOpen() && ch.isConnected(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.byteBuffer(); - } - @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index bad38d9557..d30dff40ab 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -2,6 +2,7 @@ package io.netty.channel.socket.oio; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoop; @@ -10,8 +11,8 @@ import java.net.SocketAddress; abstract class AbstractOioChannel extends AbstractChannel { - protected AbstractOioChannel(Channel parent, Integer id) { - super(parent, id); + protected AbstractOioChannel(Channel parent, Integer id, ChannelBufferHolder outboundBuffer) { + super(parent, id, outboundBuffer); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 45c95526fe..13763c9102 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -2,7 +2,6 @@ package io.netty.channel.socket.oio; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelPipeline; import java.io.IOException; @@ -10,13 +9,9 @@ import java.util.Queue; abstract class AbstractOioMessageChannel extends AbstractOioChannel { - protected AbstractOioMessageChannel(Channel parent, Integer id) { - super(parent, id); - } - - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.messageBuffer(); + protected AbstractOioMessageChannel( + Channel parent, Integer id, ChannelBufferHolder outboundBuffer) { + super(parent, id, outboundBuffer); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java index e9bd2fe440..4f151508da 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioStreamChannel.java @@ -11,12 +11,7 @@ import java.io.IOException; abstract class AbstractOioStreamChannel extends AbstractOioChannel { protected AbstractOioStreamChannel(Channel parent, Integer id) { - super(parent, id); - } - - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.byteBuffer(); + super(parent, id, ChannelBufferHolders.byteBuffer()); } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 6262f4374e..435c4c3fdc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -66,7 +65,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel } public OioDatagramChannel(Integer id, MulticastSocket socket) { - super(null, id); + super(null, id, ChannelBufferHolders.messageBuffer()); boolean success = false; try { @@ -101,11 +100,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel return isOpen() && socket.isBound(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.messageBuffer(); - } - @Override protected SocketAddress localAddress0() { return socket.getLocalSocketAddress(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 98fd4bb1bb..2252750bc2 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -15,7 +15,6 @@ */ package io.netty.channel.socket.oio; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultServerSocketChannelConfig; @@ -61,7 +60,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel } public OioServerSocketChannel(Integer id, ServerSocket socket) { - super(null, id); + super(null, id, ChannelBufferHolders.discardBuffer()); if (socket == null) { throw new NullPointerException("socket"); } @@ -160,11 +159,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel throw new UnsupportedOperationException(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.discardBuffer(); - } - @Override protected SocketAddress remoteAddress0() { return null; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 8d243dadc8..4d84dd5a36 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -17,8 +17,6 @@ package io.netty.channel.socket.oio; import io.netty.buffer.ChannelBuffer; import io.netty.channel.Channel; -import io.netty.channel.ChannelBufferHolder; -import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelException; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.SocketChannel; @@ -94,11 +92,6 @@ public class OioSocketChannel extends AbstractOioStreamChannel return !socket.isClosed() && socket.isConnected(); } - @Override - protected ChannelBufferHolder newOutboundBuffer() { - return ChannelBufferHolders.byteBuffer(); - } - @Override protected SocketAddress localAddress0() { return socket.getLocalSocketAddress(); diff --git a/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java b/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java index 013ea38d56..8f4629f4ec 100644 --- a/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java +++ b/transport/src/test/java/io/netty/channel/CompleteChannelFutureTest.java @@ -38,29 +38,6 @@ public class CompleteChannelFutureTest { new CompleteChannelFutureImpl(null); } - @Test - public void shouldNotifyImmediatelyOnAdd() throws Exception { - ChannelFutureListener l = createStrictMock(ChannelFutureListener.class); - l.operationComplete(future); - replay(l); - - future.addListener(l); - verify(l); - } - - @Test - public void shouldNotRethrowListenerException() { - ChannelFutureListener l = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - throw new ExpectedError(); - } - }; - - future.addListener(l); - } - @Test public void shouldNotDoAnythingOnRemove() throws Exception { ChannelFutureListener l = createStrictMock(ChannelFutureListener.class); @@ -110,11 +87,4 @@ public class CompleteChannelFutureTest { throw new Error(); } } - - private static class ExpectedError extends Error { - private static final long serialVersionUID = 7059276744882005047L; - - ExpectedError() { - } - } } diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index 8fd40b355d..9298fa102d 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -16,32 +16,49 @@ package io.netty.channel; import static org.junit.Assert.*; +import io.netty.channel.local.LocalChannel; import org.junit.Test; public class DefaultChannelPipelineTest { @Test public void testReplaceChannelHandler() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(); - - SimpleChannelHandler handler1 = new SimpleChannelHandler(); + DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + + ChannelHandler handler1 = newHandler(); pipeline.addLast("handler1", handler1); pipeline.addLast("handler2", handler1); pipeline.addLast("handler3", handler1); assertTrue(pipeline.get("handler1") == handler1); assertTrue(pipeline.get("handler2") == handler1); assertTrue(pipeline.get("handler3") == handler1); - - SimpleChannelHandler newHandler1 = new SimpleChannelHandler(); + + ChannelHandler newHandler1 = newHandler(); pipeline.replace("handler1", "handler1", newHandler1); assertTrue(pipeline.get("handler1") == newHandler1); - - SimpleChannelHandler newHandler3 = new SimpleChannelHandler(); + + ChannelHandler newHandler3 = newHandler(); pipeline.replace("handler3", "handler3", newHandler3); assertTrue(pipeline.get("handler3") == newHandler3); - - SimpleChannelHandler newHandler2 = new SimpleChannelHandler(); + + ChannelHandler newHandler2 = newHandler(); pipeline.replace("handler2", "handler2", newHandler2); assertTrue(pipeline.get("handler2") == newHandler2); } + + private static ChannelHandler newHandler() { + return new ChannelHandlerAdapter() { + @Override + public ChannelBufferHolder newInboundBuffer( + ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(); + } + + @Override + public ChannelBufferHolder newOutboundBuffer( + ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.byteBuffer(); + } + }; + } }