* Improved the local transport to behave more closely to real transports

* Changed how ChannelFactory is instantiated
This commit is contained in:
Trustin Lee 2009-02-09 07:34:49 +00:00
parent 5af89d1985
commit 9410fd155e
21 changed files with 370 additions and 247 deletions

View File

@ -23,11 +23,15 @@ package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
@ -45,12 +49,15 @@ public class LocalChannel extends AbstractChannel {
}
};
volatile LocalChannel pairedChannel = null;
volatile LocalChannel pairedChannel;
volatile LocalAddress localAddress;
final AtomicBoolean bound = new AtomicBoolean();
private final LocalChannelConfig config;
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
protected LocalChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) {
protected LocalChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, LocalChannel pairedChannel) {
super(null, factory, pipeline, sink);
this.pairedChannel = pairedChannel;
config = new LocalChannelConfig();
fireChannelOpen(this);
}
@ -60,38 +67,101 @@ public class LocalChannel extends AbstractChannel {
}
public boolean isBound() {
return true;
return isOpen() && bound.get();
}
public boolean isConnected() {
return true;
return pairedChannel != null;
}
public LocalAddress getLocalAddress() {
// FIXME: should return LocalAddress
return null;
return isBound()? localAddress : null;
}
public LocalAddress getRemoteAddress() {
// FIXME: should return LocalAddress
return null;
LocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel == null) {
return null;
} else {
return pairedChannel.getLocalAddress();
}
}
void writeNow(LocalChannel pairedChannel) {
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
void closeNow(ChannelFuture future) {
LocalAddress localAddress = this.localAddress;
try {
// Close the self.
if (!setClosed()) {
future.setSuccess();
return;
}
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
LocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel != null) {
this.pairedChannel = null;
this.localAddress = null;
fireChannelDisconnected(this);
fireChannelUnbound(this);
}
fireChannelClosed(this);
// Close the peer.
if (!pairedChannel.setClosed()) {
return;
}
LocalChannel me = pairedChannel.pairedChannel;
if (me != null) {
pairedChannel.pairedChannel = null;
pairedChannel.localAddress = null;
fireChannelDisconnected(pairedChannel);
fireChannelUnbound(pairedChannel);
}
fireChannelClosed(pairedChannel);
} finally {
if (localAddress != null) {
LocalChannelRegistry.unregister(localAddress);
}
}
}
void flushWriteBuffer() {
LocalChannel pairedChannel = this.pairedChannel;
if (pairedChannel == null || !pairedChannel.isConnected()) {
// Channel is closed or not connected yet - notify as failures.
Exception cause;
if (isOpen()) {
cause = new NotYetConnectedException();
} else {
cause = new ClosedChannelException();
}
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
e.getFuture().setFailure(cause);
fireExceptionCaught(this, cause);
}
} else {
// Channel is open and connected - trigger events.
if (!delivering.get()) {
delivering.set(true);
try {
for (;;) {
MessageEvent e = writeBuffer.poll();
if(e == null) {
break;
}
e.getFuture().setSuccess();
fireMessageReceived(pairedChannel, e.getMessage());
}
} finally {
delivering.set(false);
}
} finally {
delivering.set(false);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.util.ConcurrentIdentityHashMap;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
final class LocalChannelRegistry {
private static final ConcurrentMap<LocalAddress, Channel> map =
new ConcurrentIdentityHashMap<LocalAddress, Channel>();
static boolean isRegistered(LocalAddress address) {
return map.containsKey(address);
}
static Channel getChannel(LocalAddress address) {
return map.get(address);
}
static boolean register(LocalAddress address, Channel channel) {
return map.putIfAbsent(address, channel) == null;
}
static boolean unregister(LocalAddress address) {
return map.remove(address) != null;
}
private LocalChannelRegistry() {
// Unused
}
}

View File

@ -34,12 +34,12 @@ public class LocalClientChannelFactory implements ChannelFactory {
private final ChannelSink sink;
public LocalClientChannelFactory(LocalServerChannelFactory serverFactory) {
sink = new LocalClientChannelSink(serverFactory.channel, serverFactory.sink);
public LocalClientChannelFactory() {
sink = new LocalClientChannelSink();
}
public Channel newChannel(ChannelPipeline pipeline) {
return new LocalChannel(this, pipeline, sink);
return new LocalChannel(this, pipeline, sink, null);
}
public void releaseExternalResources() {

View File

@ -23,29 +23,29 @@ package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.net.ConnectException;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
final class LocalClientChannelSink extends AbstractChannelSink {
private final Channel serverChannel;
private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
private final ChannelSink serverSink;
LocalClientChannelSink(Channel channel, ChannelSink sink) {
serverChannel = channel;
serverSink = sink;
LocalClientChannelSink() {
super();
}
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
@ -58,39 +58,86 @@ final class LocalClientChannelSink extends AbstractChannelSink {
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
future.setSuccess();
fireChannelDisconnected(channel);
fireChannelClosed(channel);
fireChannelDisconnected(channel.pairedChannel);
fireChannelClosed(channel.pairedChannel);
}
break;
case BOUND:
break;
case CONNECTED:
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (LocalAddress) value);
} else {
channel.closeNow(future);
}
break;
case CONNECTED:
if (value != null) {
connect(channel, future, (LocalAddress) value);
break;
case INTEREST_OPS:
break;
} else {
channel.closeNow(future);
}
break;
case INTEREST_OPS:
// TODO: Implement traffic control.
break;
}
}
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
channel.pairedChannel.writeBuffer.offer(event);
channel.pairedChannel.writeNow(channel.pairedChannel);
channel.writeBuffer.offer(event);
channel.flushWriteBuffer();
}
}
private void connect(LocalChannel channel, ChannelFuture future, LocalAddress localAddress) throws Exception {
private void bind(LocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
try {
if (!LocalChannelRegistry.register(localAddress, channel)) {
throw new ChannelException("address already in use: " + localAddress);
}
if (!channel.bound.compareAndSet(false, true)) {
throw new ChannelException("already bound");
}
channel.localAddress = localAddress;
future.setSuccess();
fireChannelBound(channel, localAddress);
} catch (Throwable t) {
LocalChannelRegistry.unregister(localAddress);
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void connect(LocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
if (!(remoteChannel instanceof LocalServerChannel)) {
future.setFailure(new ConnectException("connection refused"));
return;
}
LocalServerChannel serverChannel = (LocalServerChannel) remoteChannel;
ChannelPipeline pipeline;
try {
pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
} catch (Exception e) {
future.setFailure(e);
logger.warn(
"Failed to initialize an accepted socket.", e);
return;
}
future.setSuccess();
ChannelPipeline pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
LocalChannel acceptedChannel = new LocalChannel(serverChannel.getFactory(), pipeline, serverSink);
LocalChannel acceptedChannel = new LocalChannel(serverChannel.getFactory(), pipeline, this, channel);
channel.pairedChannel = acceptedChannel;
acceptedChannel.pairedChannel = channel;
Channels.fireChannelConnected(channel, localAddress);
Channels.fireChannelConnected(acceptedChannel, localAddress);
bind(channel, succeededFuture(channel), LocalAddress.newEphemeralInstance());
fireChannelConnected(channel, serverChannel.getLocalAddress());
acceptedChannel.localAddress = serverChannel.getLocalAddress();
acceptedChannel.bound.set(true);
fireChannelBound(acceptedChannel, channel.getRemoteAddress());
fireChannelConnected(acceptedChannel, channel.getLocalAddress());
}
}

View File

@ -23,7 +23,7 @@ package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelConfig;
@ -33,10 +33,14 @@ import org.jboss.netty.channel.ChannelSink;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public class LocalServerChannel extends AbstractServerChannel {
final class LocalServerChannel extends AbstractServerChannel {
final ChannelConfig channelConfig;
volatile LocalAddress localAddress;
final AtomicBoolean bound = new AtomicBoolean();
protected LocalServerChannel(ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) {
super(factory, pipeline, sink);
channelConfig = new LocalChannelConfig();
@ -48,18 +52,23 @@ public class LocalServerChannel extends AbstractServerChannel {
}
public boolean isBound() {
return true;
return isOpen() && bound.get();
}
public boolean isConnected() {
return true;
return false;
}
public SocketAddress getLocalAddress() {
public LocalAddress getLocalAddress() {
return isBound()? localAddress : null;
}
public LocalAddress getRemoteAddress() {
return null;
}
public SocketAddress getRemoteAddress() {
return null;
@Override
protected boolean setClosed() {
return super.setClosed();
}
}

View File

@ -31,23 +31,17 @@ import org.jboss.netty.channel.ChannelSink;
*/
public class LocalServerChannelFactory implements ChannelFactory {
private final String channelName;
ChannelSink sink;
Channel channel;
private final ChannelSink sink = new LocalServerChannelSink();
public LocalServerChannelFactory(String channelName) {
this.channelName = channelName;
sink = new LocalServerChannelSink();
public LocalServerChannelFactory() {
super();
}
public Channel newChannel(ChannelPipeline pipeline) {
if (channel == null) {
channel = new LocalServerChannel(this, pipeline, sink);
}
return channel;
return new LocalServerChannel(this, pipeline, sink);
}
public void releaseExternalResources() {
LocalServerChannels.unregisterServerChannel(channelName);
// Unused
}
}

View File

@ -24,7 +24,9 @@ package org.jboss.netty.channel.local;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
@ -37,67 +39,103 @@ import org.jboss.netty.channel.MessageEvent;
final class LocalServerChannelSink extends AbstractChannelSink {
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
if (e.getChannel() instanceof LocalServerChannel) {
handleServerChannel(event);
}
else if (e.getChannel() instanceof LocalChannel) {
handleLocalChannel(event);
}
Channel channel = e.getChannel();
if (channel instanceof LocalServerChannel) {
handleServerChannel(e);
}
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
channel.pairedChannel.writeBuffer.offer(event);
channel.pairedChannel.writeNow(channel.pairedChannel);
}
}
private void handleLocalChannel(ChannelStateEvent event) {
LocalChannel localChannel =
(LocalChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
// FIXME: Proper event emission.
case OPEN:
if (Boolean.FALSE.equals(value)) {
future.setSuccess();
fireChannelDisconnected(localChannel);
fireChannelUnbound(localChannel);
fireChannelClosed(localChannel);
fireChannelDisconnected(localChannel.pairedChannel);
fireChannelUnbound(localChannel.pairedChannel);
fireChannelClosed(localChannel.pairedChannel);
}
break;
case BOUND:
break;
else if (channel instanceof LocalChannel) {
handleAcceptedChannel(e);
}
}
private void handleServerChannel(ChannelStateEvent event) {
LocalServerChannel serverChannel =
private void handleServerChannel(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
LocalServerChannel channel =
(LocalServerChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
break;
case BOUND:
if (value != null) {
bind(future, serverChannel);
}
break;
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
bind(channel, future, (LocalAddress) value);
} else {
close(channel, future);
}
break;
}
}
private void bind(ChannelFuture future, LocalServerChannel serverChannel) {
private void handleAcceptedChannel(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.closeNow(future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.closeNow(future);
}
break;
case INTEREST_OPS:
// TODO: Implement traffic control
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
LocalChannel channel = (LocalChannel) event.getChannel();
channel.writeBuffer.offer(event);
channel.flushWriteBuffer();
}
}
private void bind(LocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
try {
if (!LocalChannelRegistry.register(localAddress, channel)) {
throw new ChannelException("address already in use: " + localAddress);
}
if (!channel.bound.compareAndSet(false, true)) {
throw new ChannelException("already bound");
}
channel.localAddress = localAddress;
future.setSuccess();
fireChannelBound(channel, localAddress);
} catch (Throwable t) {
LocalChannelRegistry.unregister(localAddress);
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void close(LocalServerChannel channel, ChannelFuture future) {
future.setSuccess();
fireChannelBound(serverChannel, serverChannel.getLocalAddress());
if (channel.setClosed()) {
LocalAddress localAddress = channel.localAddress;
if (channel.bound.compareAndSet(true, false)) {
channel.localAddress = null;
LocalChannelRegistry.unregister(localAddress);
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
}
}
}

View File

@ -1,55 +0,0 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.channel.local;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author Trustin Lee (tlee@redhat.com)
*/
public final class LocalServerChannels {
private static final Map<String, LocalServerChannelFactory> factoryMap = new HashMap<String, LocalServerChannelFactory>();
public static LocalServerChannelFactory registerServerChannel(String channelName) {
if (factoryMap.keySet().contains(channelName)) {
throw new IllegalArgumentException("server channel already registered: " + channelName);
}
LocalServerChannelFactory factory = new LocalServerChannelFactory(channelName);
factoryMap.put(channelName, factory);
return factory;
}
public static LocalClientChannelFactory getClientChannelFactory(String channelName) {
LocalServerChannelFactory localServerChannelFactory = factoryMap.get(channelName);
return localServerChannelFactory == null?null:new LocalClientChannelFactory(localServerChannelFactory);
}
public static void unregisterServerChannel(String channelName) {
factoryMap.remove(channelName);
}
private LocalServerChannels() {
// Unused.
}
}

View File

@ -337,7 +337,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
private void close(SelectionKey k) {
k.cancel();
NioSocketChannel ch = (NioSocketChannel) k.attachment();
NioWorker.close(ch, ch.getSucceededFuture());
NioWorker.close(ch, succeededFuture(ch));
}
}

View File

@ -31,7 +31,6 @@ import java.nio.channels.ServerSocketChannel;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -112,9 +111,4 @@ class NioServerSocketChannel extends AbstractServerChannel
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
}

View File

@ -153,11 +153,6 @@ class NioSocketChannel extends AbstractChannel
return super.setClosed();
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {

View File

@ -313,7 +313,7 @@ class NioWorker implements Runnable {
private static void close(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
close(ch, ch.getSucceededFuture());
close(ch, succeededFuture(ch));
}
static void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
@ -411,7 +411,7 @@ class NioWorker implements Runnable {
fireExceptionCaught(channel, t);
if (t instanceof IOException) {
open = false;
close(channel, channel.getSucceededFuture());
close(channel, succeededFuture(channel));
}
}
}
@ -639,7 +639,7 @@ class NioWorker implements Runnable {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, channel.getSucceededFuture());
close(channel, succeededFuture(channel));
return;
}
@ -652,7 +652,7 @@ class NioWorker implements Runnable {
if (future != null) {
future.setFailure(e);
}
close(channel, channel.getSucceededFuture());
close(channel, succeededFuture(channel));
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}

View File

@ -31,7 +31,6 @@ import java.net.ServerSocket;
import org.jboss.netty.channel.AbstractServerChannel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -113,9 +112,4 @@ class OioServerSocketChannel extends AbstractServerChannel
protected boolean setClosed() {
return super.setClosed();
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
}

View File

@ -96,11 +96,6 @@ abstract class OioSocketChannel extends AbstractChannel
super.setInterestOpsNow(interestOps);
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
abstract PushbackInputStream getInputStream();
abstract OutputStream getOutputStream();

View File

@ -105,7 +105,7 @@ class OioWorker implements Runnable {
channel.workerThread = null;
// Clean up.
close(channel, channel.getSucceededFuture());
close(channel, succeededFuture(channel));
}
static void write(

View File

@ -114,11 +114,6 @@ class ServletClientSocketChannel extends AbstractChannel
super.setInterestOpsNow(interestOps);
}
@Override
protected ChannelFuture getSucceededFuture() {
return super.getSucceededFuture();
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {

View File

@ -79,7 +79,7 @@ class ServletWorker implements Runnable {
channel.workerThread = null;
// Clean up.
close(channel, channel.getSucceededFuture());
close(channel, succeededFuture(channel));
}
static void write(

View File

@ -32,8 +32,8 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalClientChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.example.echo.EchoHandler;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
@ -44,22 +44,23 @@ import org.jboss.netty.handler.codec.string.StringEncoder;
*/
public class LocalExample {
public static void main(String[] args) throws Exception {
LocalServerChannelFactory factory = LocalServerChannels.registerServerChannel("localChannel");
ChannelFactory factory = new LocalServerChannelFactory();
ServerBootstrap bootstrap = new ServerBootstrap(factory);
EchoHandler handler = new EchoHandler();
LocalAddress socketAddress = LocalAddress.getInstance("1");
bootstrap.getPipeline().addLast("handler", handler);
bootstrap.bind(socketAddress);
ChannelFactory channelFactory = LocalServerChannels.getClientChannelFactory("localChannel");
ChannelFactory channelFactory = new LocalClientChannelFactory();
ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
clientBootstrap.getPipeline().addLast("decoder", new StringDecoder());
clientBootstrap.getPipeline().addLast("encoder", new StringEncoder());
clientBootstrap.getPipeline().addLast("handler", new PrintHandler());
ChannelFuture channelFuture = clientBootstrap.connect(socketAddress);
channelFuture.awaitUninterruptibly();
System.out.println("Enter text (quit to end)");
// Read commands from the stdin.
ChannelFuture lastWriteFuture = null;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
@ -80,10 +81,6 @@ public class LocalExample {
channelFuture.getChannel().close();
// Wait until the connection is closed or the connection attempt fails.
channelFuture.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
channelFactory.releaseExternalResources();
factory.releaseExternalResources();
}
@ChannelPipelineCoverage("all")
@ -92,7 +89,7 @@ public class LocalExample {
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");
return null;
return message;
}
}
}

View File

@ -23,9 +23,9 @@ package org.jboss.netty.example.servlet;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
import org.jboss.netty.channel.local.LocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.example.echo.EchoHandler;
/**
@ -38,15 +38,17 @@ import org.jboss.netty.example.echo.EchoHandler;
*/
public class LocalTransportRegister {
private final ChannelFactory factory = new LocalServerChannelFactory();
private volatile Channel serverChannel;
public void start() {
LocalServerChannelFactory serverChannelFactory = LocalServerChannels.registerServerChannel("org.jboss.netty.exampleChannel");
ServerBootstrap serverBootstrap = new ServerBootstrap(serverChannelFactory);
ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
EchoHandler handler = new EchoHandler();
serverBootstrap.getPipeline().addLast("handler", handler);
Channel channel = serverBootstrap.bind(LocalAddress.getInstance("localAddress"));
serverChannel = serverBootstrap.bind(LocalAddress.getInstance("localAddress"));
}
public void stop() {
LocalServerChannels.unregisterServerChannel("org.jboss.netty.exampleChannel");
serverChannel.close();
}
}

View File

@ -44,7 +44,7 @@ import org.jboss.netty.handler.codec.string.StringEncoder;
<web-app xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
version="2.4">
<!--the name of the channel, this should be a registered local channel. see LocalTransportRegister-->
<!--the name of the channel, this should be a registered local channel. see LocalTransportRegister-->
<context-param>
<param-name>serverChannelName</param-name>
<param-value>org.jboss.netty.exampleChannel</param-value>
@ -124,7 +124,7 @@ public class ServletClientExample {
protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
String message = (String) msg;
System.out.println("received message back '" + message + "'");
return null;
return message;
}
}
}

View File

@ -26,7 +26,7 @@ import javax.servlet.ServletContextListener;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.local.LocalServerChannels;
import org.jboss.netty.channel.local.LocalClientChannelFactory;
/**
* A context listener that creates a client bootstrap that uses a local channel factory. The local channel factory should
@ -49,29 +49,18 @@ public class NettyServletContextListener implements ServletContextListener {
static final String BOOTSTRAP_PROP = "bootstrap";
private final ChannelFactory factory = new LocalClientChannelFactory();
public void contextInitialized(ServletContextEvent context) {
String channelName = context.getServletContext().getInitParameter(SERVER_CHANNEL_PROP);
if (channelName != null) {
String name = channelName.trim();
ChannelFactory channelFactory = LocalServerChannels.getClientChannelFactory(name);
if (channelFactory != null) {
context.getServletContext().setAttribute(BOOTSTRAP_PROP, new ClientBootstrap(channelFactory));
}
else {
throw new IllegalArgumentException("channel factory " + name + " not registered");
}
String timeoutParam = context.getServletContext().getInitParameter(RECONNECT_PROP);
context.getServletContext().setAttribute(RECONNECT_PROP, timeoutParam == null?DEFAULT_RECONNECT_TIMEOUT:Long.decode(timeoutParam.trim()));
String streaming = context.getServletContext().getInitParameter(STREAMING_PROP);
context.getServletContext().setAttribute(STREAMING_PROP, streaming == null?DEFAULT_IS_STREAMING: Boolean.valueOf(streaming.trim()));
}
context.getServletContext().setAttribute(BOOTSTRAP_PROP, new ClientBootstrap(factory));
String timeoutParam = context.getServletContext().getInitParameter(RECONNECT_PROP);
context.getServletContext().setAttribute(RECONNECT_PROP, timeoutParam == null?DEFAULT_RECONNECT_TIMEOUT:Long.decode(timeoutParam.trim()));
String streaming = context.getServletContext().getInitParameter(STREAMING_PROP);
context.getServletContext().setAttribute(STREAMING_PROP, streaming == null?DEFAULT_IS_STREAMING: Boolean.valueOf(streaming.trim()));
}
public void contextDestroyed(ServletContextEvent context) {
// Unused
}
}