Implement the local transport

- Replace the old local example with localecho example
- Channel's outbound buffer is guaranteed to be created on construction
  time.
This commit is contained in:
Trustin Lee 2012-05-30 03:58:14 -07:00
parent 392623749e
commit a53ecbf5f1
36 changed files with 686 additions and 1280 deletions

View File

@ -31,16 +31,15 @@ import java.util.Queue;
class EmbeddedChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private final ChannelBufferHolder<?> firstOut;
private final SocketAddress localAddress = new EmbeddedSocketAddress();
private final SocketAddress remoteAddress = new EmbeddedSocketAddress();
private final Queue<Object> productQueue;
private int state; // 0 = OPEN, 1 = ACTIVE, 2 = CLOSED
EmbeddedChannel(Queue<Object> productQueue) {
super(null, null);
super(null, null, ChannelBufferHolders.catchAllBuffer(
productQueue, ChannelBuffers.dynamicBuffer()));
this.productQueue = productQueue;
firstOut = ChannelBufferHolders.catchAllBuffer(productQueue, ChannelBuffers.dynamicBuffer());
}
@Override
@ -63,12 +62,6 @@ class EmbeddedChannel extends AbstractChannel {
return loop instanceof EmbeddedEventLoop;
}
@Override
@SuppressWarnings("unchecked")
protected ChannelBufferHolder<Object> firstOut() {
return (ChannelBufferHolder<Object>) firstOut;
}
@Override
protected SocketAddress localAddress0() {
return isActive()? localAddress : null;

View File

@ -1,109 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.local;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.local.DefaultLocalClientChannelFactory;
import io.netty.channel.local.DefaultLocalServerChannelFactory;
import io.netty.channel.local.LocalAddress;
import io.netty.example.echo.EchoServerHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
public class LocalExample {
private final String port;
public LocalExample(String port) {
this.port = port;
}
public void run() throws IOException {
// Address to bind on / connect to.
LocalAddress socketAddress = new LocalAddress(port);
// Configure the server.
ServerBootstrap sb = new ServerBootstrap(
new DefaultLocalServerChannelFactory());
// Set up the default server-side event pipeline.
EchoServerHandler handler = new EchoServerHandler();
sb.pipeline().addLast("handler", handler);
// Start up the server.
sb.bind(socketAddress);
// Configure the client.
ClientBootstrap cb = new ClientBootstrap(
new DefaultLocalClientChannelFactory());
// Set up the client-side pipeline factory.
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new StringDecoder(),
new StringEncoder(),
new LoggingHandler(InternalLogLevel.INFO));
}
});
// Make the connection attempt to the server.
ChannelFuture channelFuture = cb.connect(socketAddress);
channelFuture.awaitUninterruptibly();
// Read commands from the stdin.
System.out.println("Enter text (quit to end)");
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ;) {
String line = in.readLine();
if (line == null || "quit".equalsIgnoreCase(line)) {
break;
}
// Sends the received line to the server.
lastWriteFuture = channelFuture.channel().write(line);
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
channelFuture.channel().close();
// Wait until the connection is closed or the connection attempt fails.
channelFuture.channel().getCloseFuture().awaitUninterruptibly();
// Release all resources used by the local transport.
cb.releaseExternalResources();
sb.releaseExternalResources();
}
public static void main(String[] args) throws Exception {
new LocalExample("1").run();
}
}

View File

@ -1,105 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.local;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ClientBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.Channels;
import io.netty.channel.local.DefaultLocalClientChannelFactory;
import io.netty.channel.local.DefaultLocalServerChannelFactory;
import io.netty.channel.local.LocalAddress;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
public class LocalExampleMultithreaded {
private final String port;
public LocalExampleMultithreaded(String port) {
this.port = port;
}
public void run() {
LocalAddress socketAddress = new LocalAddress(port);
OrderedMemoryAwareThreadPoolExecutor eventExecutor =
new OrderedMemoryAwareThreadPoolExecutor(
5, 1000000, 10000000, 100,
TimeUnit.MILLISECONDS);
ServerBootstrap sb = new ServerBootstrap(
new DefaultLocalServerChannelFactory());
sb.setPipelineFactory(new LocalServerPipelineFactory(eventExecutor));
sb.bind(socketAddress);
ClientBootstrap cb = new ClientBootstrap(
new DefaultLocalClientChannelFactory());
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new StringDecoder(),
new StringEncoder(),
new LoggingHandler(InternalLogLevel.INFO));
}
});
// Read commands from array
String[] commands = { "First", "Second", "Third", "quit" };
for (int j = 0; j < 5 ; j++) {
System.err.println("Start " + j);
ChannelFuture channelFuture = cb.connect(socketAddress);
channelFuture.awaitUninterruptibly();
if (! channelFuture.isSuccess()) {
System.err.println("CANNOT CONNECT");
channelFuture.cause().printStackTrace();
break;
}
ChannelFuture lastWriteFuture = null;
for (String line: commands) {
// Sends the received line to the server.
lastWriteFuture = channelFuture.channel().write(line);
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
channelFuture.channel().close();
// Wait until the connection is closed or the connection attempt fails.
channelFuture.channel().getCloseFuture().awaitUninterruptibly();
System.err.println("End " + j);
}
// Release all resources
cb.releaseExternalResources();
sb.releaseExternalResources();
eventExecutor.shutdownNow();
}
public static void main(String[] args) throws Exception {
new LocalExampleMultithreaded("1").run();
}
}

View File

@ -1,81 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.local;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.execution.ExecutionHandler;
public class LocalServerPipelineFactory implements ChannelPipelineFactory {
private final ExecutionHandler executionHandler;
public LocalServerPipelineFactory(Executor eventExecutor) {
executionHandler = new ExecutionHandler(eventExecutor);
}
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("executor", executionHandler);
pipeline.addLast("handler", new EchoCloseServerHandler());
return pipeline;
}
static class EchoCloseServerHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof MessageEvent) {
final MessageEvent evt = (MessageEvent) e;
String msg = (String) evt.getMessage();
if (msg.equalsIgnoreCase("quit")) {
Channels.close(e.getChannel());
return;
}
}
ctx.sendUpstream(e);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof MessageEvent) {
final MessageEvent evt = (MessageEvent) e;
String msg = (String) evt.getMessage();
if (msg.equalsIgnoreCase("quit")) {
Channels.close(e.getChannel());
return;
}
System.err.println("SERVER:" + msg);
// Write back
Channels.write(e.getChannel(), msg);
}
ctx.sendDownstream(e);
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.localecho;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class LocalEcho {
private final String port;
public LocalEcho(String port) {
this.port = port;
}
public void run() throws Exception {
// Address to bind on / connect to.
final LocalAddress addr = new LocalAddress(port);
Bootstrap cb = new Bootstrap();
ServerBootstrap sb = new ServerBootstrap();
try {
// Note that we can use any event loop so that you can ensure certain local channels
// are handled by the same event loop thread which drives a certain socket channel.
sb.eventLoop(new NioEventLoop(), new NioEventLoop())
.channel(new LocalServerChannel())
.localAddress(addr)
.initializer(new ChannelInitializer<LocalServerChannel>() {
@Override
public void initChannel(LocalServerChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
}
})
.childInitializer(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new LocalEchoServerHandler());
}
});
cb.eventLoop(new NioEventLoop())
.channel(new LocalChannel())
.remoteAddress(addr)
.initializer(new ChannelInitializer<LocalChannel>() {
@Override
public void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new LocalEchoClientHandler());
}
});
Channel sch = sb.bind().sync().channel();
Channel ch = cb.connect().sync().channel();
// Read commands from the stdin.
System.out.println("Enter text (quit to end)");
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ;) {
String line = in.readLine();
if (line == null || "quit".equalsIgnoreCase(line)) {
break;
}
// Sends the received line to the server.
lastWriteFuture = ch.write(line);
}
// Wait until all messages are flushed before closing the channel.
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
}
ch.close().sync();
sch.close().sync();
} finally {
sb.shutdown();
cb.shutdown();
}
}
public static void main(String[] args) throws Exception {
new LocalEcho("1").run();
}
}

View File

@ -0,0 +1,19 @@
package io.netty.example.localecho;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public class LocalEchoClientHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String msg) {
// Print as received
System.out.println(msg);
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -0,0 +1,19 @@
package io.netty.example.localecho;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
public class LocalEchoServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
@Override
public void messageReceived(ChannelInboundHandlerContext<String> ctx, String msg) {
// Write back as received
ctx.write(msg);
}
@Override
public void exceptionCaught(ChannelInboundHandlerContext<String> ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -80,7 +80,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private volatile EventLoop eventLoop;
private volatile boolean registered;
private final ChannelBufferHolder<Object> outbound;
private final ChannelBufferHolder<Object> directOutbound;
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private long writeCounter;
@ -102,7 +102,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* the parent of this channel. {@code null} if there's no parent.
*/
@SuppressWarnings("unchecked")
protected AbstractChannel(Channel parent, Integer id) {
protected AbstractChannel(Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
if (outboundBuffer == null) {
throw new NullPointerException("outboundBuffer");
}
if (id == null) {
id = allocateId(this);
} else {
@ -117,7 +121,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
outbound = (ChannelBufferHolder<Object>) newOutboundBuffer();
directOutbound = (ChannelBufferHolder<Object>) outboundBuffer;
closeFuture().addListener(new ChannelFutureListener() {
@Override
@ -370,27 +374,27 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final Runnable flushLaterTask = new FlushLater();
@Override
public ChannelBufferHolder<Object> directOutbound() {
return outbound;
public final ChannelBufferHolder<Object> directOutbound() {
return directOutbound;
}
@Override
public ChannelFuture voidFuture() {
public final ChannelFuture voidFuture() {
return voidFuture;
}
@Override
public SocketAddress localAddress() {
public final SocketAddress localAddress() {
return localAddress0();
}
@Override
public SocketAddress remoteAddress() {
public final SocketAddress remoteAddress() {
return remoteAddress0();
}
@Override
public void register(EventLoop eventLoop, ChannelFuture future) {
public final void register(EventLoop eventLoop, ChannelFuture future) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
@ -432,7 +436,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void bind(final SocketAddress localAddress, final ChannelFuture future) {
public final void bind(final SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
@ -461,7 +465,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void disconnect(final ChannelFuture future) {
public final void disconnect(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
try {
boolean wasActive = isActive();
@ -485,7 +489,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void close(final ChannelFuture future) {
public final void close(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (closeFuture.setClosed()) {
boolean wasActive = isActive();
@ -522,16 +526,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void deregister(final ChannelFuture future) {
public final void deregister(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!registered) {
future.setSuccess();
return;
}
try {
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
registered = false;
future.setSuccess();
pipeline().fireChannelUnregistered();
if (registered) {
registered = false;
future.setSuccess();
pipeline().fireChannelUnregistered();
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
future.setSuccess();
}
}
} else {
eventLoop().execute(new Runnable() {
@ -594,7 +610,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
@Override
public void flushNow() {
public final void flushNow() {
if (inFlushNow) {
return;
}
@ -630,7 +646,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
protected boolean ensureOpen(ChannelFuture future) {
protected final boolean ensureOpen(ChannelFuture future) {
if (isOpen()) {
return true;
}
@ -641,7 +657,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return false;
}
protected void closeIfClosed() {
protected final void closeIfClosed() {
if (isOpen()) {
return;
}
@ -659,8 +675,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract boolean isCompatible(EventLoop loop);
protected abstract ChannelBufferHolder<?> newOutboundBuffer();
protected abstract SocketAddress localAddress0();
protected abstract SocketAddress remoteAddress0();

View File

@ -33,7 +33,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
* Creates a new instance.
*/
protected AbstractServerChannel(Integer id) {
super(null, id);
super(null, id, ChannelBufferHolders.discardBuffer());
}
@Override
@ -47,8 +47,8 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
}
@Override
protected ChannelBufferHolder<Object> newOutboundBuffer() {
return ChannelBufferHolders.discardBuffer();
protected Unsafe newUnsafe() {
return new DefaultServerUnsafe();
}
@Override
@ -60,4 +60,53 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
protected void doDisconnect() throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected boolean isFlushPending() {
return false;
}
protected class DefaultServerUnsafe extends AbstractUnsafe {
@Override
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
reject(future);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
flush(future);
}
});
}
}
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
reject(future);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
}
private void reject(ChannelFuture future) {
Exception cause = new UnsupportedOperationException();
future.setFailure(cause);
pipeline().fireExceptionCaught(cause);
}
}
}

View File

@ -44,6 +44,12 @@ public final class ChannelBufferHolder<E> {
}
ChannelBufferHolder(Queue<E> msgBuf, ChannelBuffer byteBuf) {
if (msgBuf == null) {
throw new NullPointerException("msgBuf");
}
if (byteBuf == null) {
throw new NullPointerException("byteBuf");
}
ctx = null;
bypassDirection = 0;
this.msgBuf = msgBuf;

View File

@ -1,218 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import static io.netty.channel.Channels.*;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageEvent;
import io.netty.util.internal.QueueFactory;
import io.netty.util.internal.ThreadLocalBoolean;
/**
*/
final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
// TODO Move the state management up to AbstractChannel to remove duplication.
private static final int ST_OPEN = 0;
private static final int ST_BOUND = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_CLOSED = -1;
final AtomicInteger state = new AtomicInteger(ST_OPEN);
private final ChannelConfig config;
private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
final Queue<MessageEvent> writeBuffer = QueueFactory.createQueue(MessageEvent.class);
volatile DefaultLocalChannel pairedChannel;
volatile LocalAddress localAddress;
volatile LocalAddress remoteAddress;
static DefaultLocalChannel create(LocalServerChannel parent,
ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink,
DefaultLocalChannel pairedChannel) {
DefaultLocalChannel instance = new DefaultLocalChannel(parent, factory, pipeline, sink,
pairedChannel);
fireChannelOpen(instance);
return instance;
}
private DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) {
super(parent, factory, pipeline, sink);
this.pairedChannel = pairedChannel;
config = new DefaultChannelConfig();
// TODO Move the state variable to AbstractChannel so that we don't need
// to add many listeners.
getCloseFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
state.set(ST_CLOSED);
}
});
}
@Override
public ChannelConfig getConfig() {
return config;
}
@Override
public boolean isOpen() {
return state.get() >= ST_OPEN;
}
@Override
public boolean isBound() {
return state.get() >= ST_BOUND;
}
@Override
public boolean isConnected() {
return state.get() == ST_CONNECTED;
}
void setBound() throws ClosedChannelException {
if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
switch (state.get()) {
case ST_CLOSED:
throw new ClosedChannelException();
default:
throw new ChannelException("already bound");
}
}
}
void setConnected() {
if (state.get() != ST_CLOSED) {
state.set(ST_CONNECTED);
}
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
@Override
public LocalAddress getLocalAddress() {
return localAddress;
}
@Override
public LocalAddress getRemoteAddress() {
return remoteAddress;
}
void closeNow(ChannelFuture future) {
LocalAddress localAddress = this.localAddress;
try {
// Close the self.
if (!setClosed()) {
return;
}
DefaultLocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel != null) {
this.pairedChannel = null;
fireChannelDisconnected(this);
fireChannelUnbound(this);
}
fireChannelClosed(this);
// Close the peer.
if (pairedChannel == null || !pairedChannel.setClosed()) {
return;
}
DefaultLocalChannel me = pairedChannel.pairedChannel;
if (me != null) {
pairedChannel.pairedChannel = null;
fireChannelDisconnected(pairedChannel);
fireChannelUnbound(pairedChannel);
}
fireChannelClosed(pairedChannel);
} finally {
future.setSuccess();
if (localAddress != null && getParent() == null) {
LocalChannelRegistry.unregister(localAddress);
}
}
}
void flushWriteBuffer() {
DefaultLocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel != null) {
if (pairedChannel.isConnected()) {
// Channel is open and connected and channelConnected event has
// been fired.
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if (e == null) {
break;
}
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
fireWriteComplete(this, 1);
}
} finally {
delivering.set(false);
}
}
} else {
// Channel is open and connected but channelConnected event has
// not been fired yet.
}
} else {
// Channel is closed or not connected yet - notify as failures.
Exception cause;
if (isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
for (;;) {
MessageEvent e = writeBuffer.poll();
if (e == null) {
break;
}
e.getFuture().setFailure(cause);
fireExceptionCaught(this, cause);
}
}
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
/**
* The default {@link LocalClientChannelFactory} implementation.
* @apiviz.landmark
*/
public class DefaultLocalClientChannelFactory implements LocalClientChannelFactory {
private final ChannelSink sink;
/**
* Creates a new instance.
*/
public DefaultLocalClientChannelFactory() {
sink = new LocalClientChannelSink();
}
@Override
public LocalChannel newChannel(ChannelPipeline pipeline) {
return DefaultLocalChannel.create(null, this, pipeline, sink, null);
}
/**
* Does nothing because this implementation does not require any external
* resources.
*/
@Override
public void releaseExternalResources() {
// No external resources.
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import static io.netty.channel.Channels.*;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.DefaultServerChannelConfig;
/**
*/
final class DefaultLocalServerChannel extends AbstractServerChannel implements
LocalServerChannel {
final ChannelConfig channelConfig;
final AtomicBoolean bound = new AtomicBoolean();
volatile LocalAddress localAddress;
static DefaultLocalServerChannel create(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
DefaultLocalServerChannel instance =
new DefaultLocalServerChannel(factory, pipeline, sink);
fireChannelOpen(instance);
return instance;
}
private DefaultLocalServerChannel(ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink) {
super(factory, pipeline, sink);
channelConfig = new DefaultServerChannelConfig();
}
@Override
public ChannelConfig getConfig() {
return channelConfig;
}
@Override
public boolean isBound() {
return isOpen() && bound.get();
}
@Override
public LocalAddress getLocalAddress() {
return isBound() ? localAddress : null;
}
@Override
public LocalAddress getRemoteAddress() {
return null;
}
@Override
protected boolean setClosed() {
return super.setClosed();
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelSink;
import io.netty.channel.group.DefaultChannelGroup;
/**
* The default {@link LocalServerChannelFactory} implementation.
* @apiviz.landmark
*/
public class DefaultLocalServerChannelFactory implements LocalServerChannelFactory {
private final DefaultChannelGroup group = new DefaultChannelGroup();
private final ChannelSink sink = new LocalServerChannelSink();
@Override
public LocalServerChannel newChannel(ChannelPipeline pipeline) {
LocalServerChannel channel = DefaultLocalServerChannel.create(this, pipeline, sink);
group.add(channel);
return channel;
}
/**
* Release all the previous created channels. This takes care of calling {@link LocalChannelRegistry#unregister(LocalAddress)}
* for each if them.
*/
@Override
public void releaseExternalResources() {
group.close().awaitUninterruptibly();
}
}

View File

@ -15,34 +15,37 @@
*/
package io.netty.channel.local;
import io.netty.channel.Channel;
import java.net.SocketAddress;
/**
* An endpoint in the local transport. Each endpoint is identified by a unique
* case-insensitive string, except for the pre-defined value called
* {@code "ephemeral"}.
* case-insensitive string.
*
* <h3>Ephemeral Address</h3>
*
* An ephemeral address is an anonymous address which is assigned temporarily
* and is released as soon as the connection is closed. All ephemeral addresses
* have the same ID, {@code "ephemeral"}, but they are not equal to each other.
* @apiviz.landmark
*/
public final class LocalAddress extends SocketAddress implements Comparable<LocalAddress> {
private static final long serialVersionUID = -3601961747680808645L;
private static final long serialVersionUID = 4644331421130916435L;
public static final String EPHEMERAL = "ephemeral";
public static final LocalAddress ANY = new LocalAddress("ANY");
private final String id;
private final boolean ephemeral;
private final String strVal;
/**
* Creates a new instance with the specified ID.
* Creates a new ephemeral port based on the ID of the specified channel.
* Note that we prepend an upper-case character so that it never conflicts with
* the addresses created by a user, which are always lower-cased on construction time.
*/
public LocalAddress(int id) {
this(String.valueOf(id));
LocalAddress(Channel channel) {
StringBuilder buf = new StringBuilder(16);
buf.append("local:E");
buf.append(Long.toHexString(channel.id().intValue() & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(7, ':');
id = buf.substring(6);
strVal = buf.toString();
}
/**
@ -57,30 +60,19 @@ public final class LocalAddress extends SocketAddress implements Comparable<Loca
throw new IllegalArgumentException("empty id");
}
this.id = id;
ephemeral = id.equals("ephemeral");
strVal = "local:" + id;
}
/**
* Returns the ID of this address.
*/
public String getId() {
public String id() {
return id;
}
/**
* Returns {@code true} if and only if this address is ephemeral.
*/
public boolean isEphemeral() {
return ephemeral;
}
@Override
public int hashCode() {
if (ephemeral) {
return System.identityHashCode(this);
} else {
return id.hashCode();
}
return id.hashCode();
}
@Override
@ -89,50 +81,16 @@ public final class LocalAddress extends SocketAddress implements Comparable<Loca
return false;
}
if (ephemeral) {
return this == o;
} else {
return getId().equals(((LocalAddress) o).getId());
}
return id.equals(((LocalAddress) o).id);
}
// FIXME: This comparison is broken! Assign distinct port numbers for
// ephemeral ports, just like O/S does for port number 0. It will
// break backward compatibility though.
@Override
public int compareTo(LocalAddress o) {
if (ephemeral) {
if (o.ephemeral) {
if (this == o) {
return 0;
}
int a = System.identityHashCode(this);
int b = System.identityHashCode(o);
if (a < b) {
return -1;
} else if (a > b) {
return 1;
} else {
throw new Error(
"Two different ephemeral addresses have " +
"same identityHashCode.");
}
} else {
return 1;
}
} else {
if (o.ephemeral) {
return -1;
} else {
return getId().compareTo(o.getId());
}
}
return id.compareTo(o.id);
}
@Override
public String toString() {
return "local:" + getId();
return strVal;
}
}

View File

@ -15,14 +15,252 @@
*/
package io.netty.channel.local;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import java.net.SocketAddress;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
/**
* A {@link Channel} for the local transport.
*/
public interface LocalChannel extends Channel {
public class LocalChannel extends AbstractChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private volatile int state; // 0 - open, 1 - bound, 2 - connected, 3 - closed
private volatile LocalChannel peer;
private volatile LocalAddress localAddress;
private volatile LocalAddress remoteAddress;
private volatile ChannelFuture connectFuture;
public LocalChannel() {
this(null);
}
public LocalChannel(Integer id) {
super(null, id, ChannelBufferHolders.messageBuffer());
}
LocalChannel(LocalServerChannel parent, LocalChannel peer) {
super(parent, null, ChannelBufferHolders.messageBuffer());
this.peer = peer;
localAddress = parent.localAddress();
remoteAddress = peer.localAddress();
}
@Override
LocalAddress getLocalAddress();
public ChannelConfig config() {
return config;
}
@Override
LocalAddress getRemoteAddress();
public LocalServerChannel parent() {
return (LocalServerChannel) super.parent();
}
@Override
public LocalAddress localAddress() {
return (LocalAddress) super.localAddress();
}
@Override
public LocalAddress remoteAddress() {
return (LocalAddress) super.remoteAddress();
}
@Override
public boolean isOpen() {
return state < 3;
}
@Override
public boolean isActive() {
return state == 2;
}
@Override
protected Unsafe newUnsafe() {
return new LocalUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleThreadEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return localAddress;
}
@Override
protected SocketAddress remoteAddress0() {
return remoteAddress;
}
@Override
protected void doRegister() throws Exception {
if (peer != null) {
state = 2;
peer.remoteAddress = parent().localAddress();
peer.state = 2;
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
peer.connectFuture.setSuccess();
peer.pipeline().fireChannelActive();
}
});
}
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
this.localAddress =
LocalChannelRegistry.register(this, this.localAddress,
localAddress);
state = 1;
}
@Override
protected void doDisconnect() throws Exception {
doClose();
}
@Override
protected void doClose() throws Exception {
if (state > 2) {
// Closed already
return;
}
LocalChannelRegistry.unregister(localAddress);
localAddress = null;
state = 3;
if (peer.isActive()) {
peer.unsafe().close(peer.unsafe().voidFuture());
peer = null;
}
}
@Override
protected void doDeregister() throws Exception {
if (isOpen()) {
unsafe().close(unsafe().voidFuture());
}
}
@Override
protected void doFlush(ChannelBufferHolder<Object> buf) throws Exception {
if (state < 2) {
throw new NotYetConnectedException();
}
if (state > 2) {
throw new ClosedChannelException();
}
final LocalChannel peer = this.peer;
assert peer != null;
Queue<Object> in = buf.messageBuffer();
Queue<Object> out = peer.pipeline().inbound().messageBuffer();
for (;;) {
Object msg = in.poll();
if (msg == null) {
break;
}
out.add(msg);
}
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
peer.pipeline().fireInboundBufferUpdated();
}
});
}
@Override
protected boolean isFlushPending() {
return false;
}
private class LocalUnsafe extends AbstractUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
SocketAddress localAddress, final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
if (!ensureOpen(future)) {
return;
}
if (state == 2) {
Exception cause = new AlreadyConnectedException();
future.setFailure(cause);
pipeline().fireExceptionCaught(cause);
return;
}
if (connectFuture != null) {
throw new ConnectionPendingException();
}
connectFuture = future;
if (state != 1) {
// Not bound yet and no localAddress specified - get one.
if (localAddress == null) {
localAddress = new LocalAddress(LocalChannel.this);
}
}
if (localAddress != null) {
try {
doBind(localAddress);
} catch (Throwable t) {
future.setFailure(t);
pipeline().fireExceptionCaught(t);
close(voidFuture());
return;
}
}
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause =
new ChannelException("connection refused");
future.setFailure(cause);
pipeline().fireExceptionCaught(cause);
close(voidFuture());
return;
}
LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
peer = serverChannel.serve(LocalChannel.this);
} else {
final SocketAddress localAddress0 = localAddress;
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress0, future);
}
});
}
}
}
}

View File

@ -1,46 +1,45 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
*/
final class LocalChannelRegistry {
private static final ConcurrentMap<LocalAddress, Channel> map =
new ConcurrentHashMap<LocalAddress, Channel>();
private static final ConcurrentMap<LocalAddress, Channel> boundChannels =
new ConcurrentHashMap<LocalAddress, Channel>();
static boolean isRegistered(LocalAddress address) {
return map.containsKey(address);
static LocalAddress register(
Channel channel, LocalAddress oldLocalAddress, SocketAddress localAddress) {
if (oldLocalAddress != null) {
throw new ChannelException("already bound");
}
if (!(localAddress instanceof LocalAddress)) {
throw new ChannelException(
"unsupported address type: " + localAddress.getClass().getSimpleName());
}
LocalAddress addr = (LocalAddress) localAddress;
if (LocalAddress.ANY.equals(addr)) {
addr = new LocalAddress(channel);
}
Channel boundChannel = boundChannels.putIfAbsent(addr, channel);
if (boundChannel != null) {
throw new ChannelException("address already in use by: " + boundChannel);
}
return addr;
}
static Channel getChannel(LocalAddress address) {
return map.get(address);
static Channel get(SocketAddress localAddress) {
return boundChannels.get(localAddress);
}
static boolean register(LocalAddress address, Channel channel) {
return map.putIfAbsent(address, channel) == null;
}
static boolean unregister(LocalAddress address) {
return map.remove(address) != null;
static void unregister(LocalAddress localAddress) {
boundChannels.remove(localAddress);
}
private LocalChannelRegistry() {

View File

@ -1,27 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelPipeline;
/**
* A {@link ChannelFactory} that creates a client-side {@link LocalChannel}.
*/
public interface LocalClientChannelFactory extends ChannelFactory {
@Override
LocalChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -1,150 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import static io.netty.channel.Channels.*;
import java.io.IOException;
import java.net.ConnectException;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
*/
final class LocalClientChannelSink extends AbstractChannelSink {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
LocalClientChannelSink() {
}
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
DefaultLocalChannel channel =
(DefaultLocalChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (LocalAddress) value);
} else {
channel.closeNow(future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (LocalAddress) value);
} else {
channel.closeNow(future);
}
break;
case INTEREST_OPS:
// Unsupported - discard silently.
future.setSuccess();
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
DefaultLocalChannel channel = (DefaultLocalChannel) event.channel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.flushWriteBuffer();
}
}
private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
try {
if (!LocalChannelRegistry.register(localAddress, channel)) {
throw new ChannelException("address already in use: " + localAddress);
}
channel.setBound();
channel.localAddress = localAddress;
future.setSuccess();
fireChannelBound(channel, localAddress);
} catch (Throwable t) {
LocalChannelRegistry.unregister(localAddress);
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
future.setFailure(new ConnectException("connection refused"));
return;
}
DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
ChannelPipeline pipeline;
try {
pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
} catch (Exception e) {
future.setFailure(e);
fireExceptionCaught(channel, e);
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
return;
}
future.setSuccess();
DefaultLocalChannel acceptedChannel = DefaultLocalChannel.create(serverChannel, serverChannel.getFactory(), pipeline, this, channel);
channel.pairedChannel = acceptedChannel;
bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
channel.remoteAddress = serverChannel.getLocalAddress();
channel.setConnected();
fireChannelConnected(channel, serverChannel.getLocalAddress());
acceptedChannel.localAddress = serverChannel.getLocalAddress();
try {
acceptedChannel.setBound();
} catch (IOException e) {
throw new Error(e);
}
fireChannelBound(acceptedChannel, channel.getRemoteAddress());
acceptedChannel.remoteAddress = channel.getLocalAddress();
acceptedChannel.setConnected();
fireChannelConnected(acceptedChannel, channel.getLocalAddress());
// Flush something that was written in channelBound / channelConnected
channel.flushWriteBuffer();
acceptedChannel.flushWriteBuffer();
}
}

View File

@ -15,14 +15,113 @@
*/
package io.netty.channel.local;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventLoop;
import java.net.SocketAddress;
/**
* A {@link ServerChannel} for the local transport.
*/
public interface LocalServerChannel extends ServerChannel {
public class LocalServerChannel extends AbstractServerChannel {
private final ChannelConfig config = new DefaultChannelConfig();
private volatile int state; // 0 - open, 1 - active, 2 - closed
private volatile LocalAddress localAddress;
public LocalServerChannel() {
this(null);
}
public LocalServerChannel(Integer id) {
super(id);
}
@Override
LocalAddress getLocalAddress();
public ChannelConfig config() {
return config;
}
@Override
LocalAddress getRemoteAddress();
public LocalAddress localAddress() {
return (LocalAddress) super.localAddress();
}
@Override
public LocalAddress remoteAddress() {
return (LocalAddress) super.remoteAddress();
}
@Override
public boolean isOpen() {
return state < 2;
}
@Override
public boolean isActive() {
return state == 1;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleThreadEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return localAddress;
}
@Override
protected void doRegister() throws Exception {
// NOOP
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
state = 1;
}
@Override
protected void doClose() throws Exception {
if (state > 1) {
// Closed already.
return;
}
LocalChannelRegistry.unregister(localAddress);
localAddress = null;
state = 2;
}
@Override
protected void doDeregister() throws Exception {
// NOOP
}
LocalChannel serve(final LocalChannel peer) {
LocalChannel child = new LocalChannel(this, peer);
serve0(child);
return child;
}
private void serve0(final LocalChannel child) {
if (eventLoop().inEventLoop()) {
pipeline().inbound().messageBuffer().add(child);
pipeline().fireInboundBufferUpdated();
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
serve0(child);
}
});
}
}
}

View File

@ -1,27 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannelFactory;
/**
* A {@link ServerChannelFactory} that creates a {@link LocalServerChannel}.
*/
public interface LocalServerChannelFactory extends ServerChannelFactory {
@Override
LocalServerChannel newChannel(ChannelPipeline pipeline);
}

View File

@ -1,144 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import static io.netty.channel.Channels.*;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.MessageEvent;
final class LocalServerChannelSink extends AbstractChannelSink {
LocalServerChannelSink() {
}
@Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof DefaultLocalServerChannel) {
handleServerChannel(e);
} else if (channel instanceof DefaultLocalChannel) {
handleAcceptedChannel(e);
}
}
private void handleServerChannel(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
DefaultLocalServerChannel channel =
(DefaultLocalServerChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (LocalAddress) value);
} else {
close(channel, future);
}
break;
}
}
private void handleAcceptedChannel(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
DefaultLocalChannel channel = (DefaultLocalChannel) event.channel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.closeNow(future);
}
break;
case INTEREST_OPS:
// Unsupported - discard silently.
future.setSuccess();
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
DefaultLocalChannel channel = (DefaultLocalChannel) event.channel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.flushWriteBuffer();
}
}
private void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
try {
if (!LocalChannelRegistry.register(localAddress, channel)) {
throw new ChannelException("address already in use: " + localAddress);
}
if (!channel.bound.compareAndSet(false, true)) {
throw new ChannelException("already bound");
}
channel.localAddress = localAddress;
future.setSuccess();
fireChannelBound(channel, localAddress);
} catch (Throwable t) {
LocalChannelRegistry.unregister(localAddress);
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void close(DefaultLocalServerChannel channel, ChannelFuture future) {
try {
if (channel.setClosed()) {
future.setSuccess();
LocalAddress localAddress = channel.localAddress;
if (channel.bound.compareAndSet(true, false)) {
channel.localAddress = null;
LocalChannelRegistry.unregister(localAddress);
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
}

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.nio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
@ -49,8 +50,10 @@ public abstract class AbstractNioChannel extends AbstractChannel {
private ScheduledFuture<?> connectTimeoutFuture;
private ConnectException connectTimeoutException;
protected AbstractNioChannel(Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id);
protected AbstractNioChannel(
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer,
SelectableChannel ch, int defaultInterestOps) {
super(parent, id, outboundBuffer);
this.ch = ch;
this.defaultInterestOps = defaultInterestOps;
try {

View File

@ -2,7 +2,6 @@ package io.netty.channel.socket.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
@ -12,13 +11,9 @@ import java.util.Queue;
abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(
Channel parent, Integer id, SelectableChannel ch, int defaultInterestOps) {
super(parent, id, ch, defaultInterestOps);
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.messageBuffer();
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer,
SelectableChannel ch, int defaultInterestOps) {
super(parent, id, outboundBuffer, ch, defaultInterestOps);
}
@Override

View File

@ -14,12 +14,7 @@ abstract class AbstractNioStreamChannel extends AbstractNioChannel {
protected AbstractNioStreamChannel(
Channel parent, Integer id, SelectableChannel ch) {
super(parent, id, ch, SelectionKey.OP_READ);
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.byteBuffer();
super(parent, id, ChannelBufferHolders.byteBuffer(), ch, SelectionKey.OP_READ);
}
@Override

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -67,7 +66,7 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen
}
public NioDatagramChannel(Integer id, DatagramChannel socket) {
super(null, id, socket, SelectionKey.OP_READ);
super(null, id, ChannelBufferHolders.messageBuffer(), socket, SelectionKey.OP_READ);
config = new NioDatagramChannelConfig(socket);
}
@ -87,11 +86,6 @@ public final class NioDatagramChannel extends AbstractNioMessageChannel implemen
return (DatagramChannel) super.javaChannel();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.messageBuffer();
}
@Override
protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress();

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -43,7 +42,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
private final ServerSocketChannelConfig config;
public NioServerSocketChannel() {
super(null, null, newSocket(), SelectionKey.OP_ACCEPT);
super(null, null, ChannelBufferHolders.discardBuffer(), newSocket(), SelectionKey.OP_ACCEPT);
config = new DefaultServerSocketChannelConfig(javaChannel().socket());
}
@ -106,11 +105,6 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.discardBuffer();
}
@Override
protected SocketAddress remoteAddress0() {
return null;

View File

@ -17,8 +17,6 @@ package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannelConfig;
@ -89,11 +87,6 @@ public class NioSocketChannel extends AbstractNioStreamChannel implements io.net
return ch.isOpen() && ch.isConnected();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.byteBuffer();
}
@Override
protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress();

View File

@ -2,6 +2,7 @@ package io.netty.channel.socket.oio;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
@ -10,8 +11,8 @@ import java.net.SocketAddress;
abstract class AbstractOioChannel extends AbstractChannel {
protected AbstractOioChannel(Channel parent, Integer id) {
super(parent, id);
protected AbstractOioChannel(Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
super(parent, id, outboundBuffer);
}
@Override

View File

@ -2,7 +2,6 @@ package io.netty.channel.socket.oio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelPipeline;
import java.io.IOException;
@ -10,13 +9,9 @@ import java.util.Queue;
abstract class AbstractOioMessageChannel extends AbstractOioChannel {
protected AbstractOioMessageChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.messageBuffer();
protected AbstractOioMessageChannel(
Channel parent, Integer id, ChannelBufferHolder<?> outboundBuffer) {
super(parent, id, outboundBuffer);
}
@Override

View File

@ -11,12 +11,7 @@ import java.io.IOException;
abstract class AbstractOioStreamChannel extends AbstractOioChannel {
protected AbstractOioStreamChannel(Channel parent, Integer id) {
super(parent, id);
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.byteBuffer();
super(parent, id, ChannelBufferHolders.byteBuffer());
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
@ -66,7 +65,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
}
public OioDatagramChannel(Integer id, MulticastSocket socket) {
super(null, id);
super(null, id, ChannelBufferHolders.messageBuffer());
boolean success = false;
try {
@ -101,11 +100,6 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
return isOpen() && socket.isBound();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.messageBuffer();
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.socket.oio;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -61,7 +60,7 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
}
public OioServerSocketChannel(Integer id, ServerSocket socket) {
super(null, id);
super(null, id, ChannelBufferHolders.discardBuffer());
if (socket == null) {
throw new NullPointerException("socket");
}
@ -160,11 +159,6 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel
throw new UnsupportedOperationException();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.discardBuffer();
}
@Override
protected SocketAddress remoteAddress0() {
return null;

View File

@ -17,8 +17,6 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel;
@ -94,11 +92,6 @@ public class OioSocketChannel extends AbstractOioStreamChannel
return !socket.isClosed() && socket.isConnected();
}
@Override
protected ChannelBufferHolder<?> newOutboundBuffer() {
return ChannelBufferHolders.byteBuffer();
}
@Override
protected SocketAddress localAddress0() {
return socket.getLocalSocketAddress();

View File

@ -38,29 +38,6 @@ public class CompleteChannelFutureTest {
new CompleteChannelFutureImpl(null);
}
@Test
public void shouldNotifyImmediatelyOnAdd() throws Exception {
ChannelFutureListener l = createStrictMock(ChannelFutureListener.class);
l.operationComplete(future);
replay(l);
future.addListener(l);
verify(l);
}
@Test
public void shouldNotRethrowListenerException() {
ChannelFutureListener l = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
throw new ExpectedError();
}
};
future.addListener(l);
}
@Test
public void shouldNotDoAnythingOnRemove() throws Exception {
ChannelFutureListener l = createStrictMock(ChannelFutureListener.class);
@ -110,11 +87,4 @@ public class CompleteChannelFutureTest {
throw new Error();
}
}
private static class ExpectedError extends Error {
private static final long serialVersionUID = 7059276744882005047L;
ExpectedError() {
}
}
}

View File

@ -16,32 +16,49 @@
package io.netty.channel;
import static org.junit.Assert.*;
import io.netty.channel.local.LocalChannel;
import org.junit.Test;
public class DefaultChannelPipelineTest {
@Test
public void testReplaceChannelHandler() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
SimpleChannelHandler handler1 = new SimpleChannelHandler();
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel());
ChannelHandler handler1 = newHandler();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler1);
pipeline.addLast("handler3", handler1);
assertTrue(pipeline.get("handler1") == handler1);
assertTrue(pipeline.get("handler2") == handler1);
assertTrue(pipeline.get("handler3") == handler1);
SimpleChannelHandler newHandler1 = new SimpleChannelHandler();
ChannelHandler newHandler1 = newHandler();
pipeline.replace("handler1", "handler1", newHandler1);
assertTrue(pipeline.get("handler1") == newHandler1);
SimpleChannelHandler newHandler3 = new SimpleChannelHandler();
ChannelHandler newHandler3 = newHandler();
pipeline.replace("handler3", "handler3", newHandler3);
assertTrue(pipeline.get("handler3") == newHandler3);
SimpleChannelHandler newHandler2 = new SimpleChannelHandler();
ChannelHandler newHandler2 = newHandler();
pipeline.replace("handler2", "handler2", newHandler2);
assertTrue(pipeline.get("handler2") == newHandler2);
}
private static ChannelHandler newHandler() {
return new ChannelHandlerAdapter<Byte, Byte>() {
@Override
public ChannelBufferHolder<Byte> newInboundBuffer(
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
@Override
public ChannelBufferHolder<Byte> newOutboundBuffer(
ChannelOutboundHandlerContext<Byte> ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}
};
}
}