Merge branch '3' of github.com:netty/netty into 3

This commit is contained in:
Trustin Lee 2012-04-12 14:15:02 +09:00
commit 2c2d64a75e
17 changed files with 766 additions and 82 deletions

View File

@ -26,7 +26,7 @@
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<packaging>bundle</packaging>
<version>3.4.0.Alpha3-SNAPSHOT</version>
<version>3.4.0.Beta2-SNAPSHOT</version>
<name>The Netty Project</name>
<url>http://netty.io/</url>

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.local;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.group.DefaultChannelGroup;
/**
* The default {@link LocalServerChannelFactory} implementation.
@ -24,6 +25,7 @@ import org.jboss.netty.channel.ChannelSink;
*/
public class DefaultLocalServerChannelFactory implements LocalServerChannelFactory {
private final DefaultChannelGroup group = new DefaultChannelGroup();
private final ChannelSink sink = new LocalServerChannelSink();
/**
@ -34,14 +36,17 @@ public class DefaultLocalServerChannelFactory implements LocalServerChannelFacto
}
public LocalServerChannel newChannel(ChannelPipeline pipeline) {
return new DefaultLocalServerChannel(this, pipeline, sink);
LocalServerChannel channel = new DefaultLocalServerChannel(this, pipeline, sink);
group.add(channel);
return channel;
}
/**
* Does nothing because this implementation does not require any external
* resources.
* Release all the previous created channels. This takes care of calling {@link LocalChannelRegistry#unregister(LocalAddress)}
* for each if them.
*/
public void releaseExternalResources() {
// Unused
group.close().awaitUninterruptibly();
}
}

View File

@ -20,7 +20,6 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.MessageEvent;
@ -29,7 +28,6 @@ import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.ThreadLocalBoolean;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
@ -163,14 +161,6 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
super.setInterestOpsNow(interestOps);
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return getUnsupportedOperationFuture();
}
}
@Override
public int getInterestOps() {

View File

@ -112,7 +112,7 @@ abstract class AbstractNioWorker implements Worker {
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
private final boolean allowShutdownOnIdle;
@ -431,9 +431,8 @@ abstract class AbstractNioWorker implements Worker {
}
protected abstract boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel);
private void write0(AbstractNioChannel<?> channel) {
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
@ -546,7 +545,7 @@ abstract class AbstractNioWorker implements Worker {
return Thread.currentThread() == channel.worker.thread;
}
private void setOpWrite(AbstractNioChannel<?> channel) {
protected void setOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
if (key == null) {
@ -569,7 +568,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void clearOpWrite(AbstractNioChannel<?> channel) {
protected void clearOpWrite(AbstractNioChannel<?> channel) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
if (key == null) {
@ -638,7 +637,7 @@ abstract class AbstractNioWorker implements Worker {
}
}
private void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
protected void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
Exception cause = null;
boolean fireExceptionCaught = false;

View File

@ -24,14 +24,18 @@ import static org.jboss.netty.channel.Channels.succeededFuture;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
@ -206,5 +210,139 @@ public class NioDatagramWorker extends AbstractNioWorker {
}
}
}
@Override
public void writeFromUserCode(final AbstractNioChannel<?> channel) {
/*
* Note that we are not checking if the channel is connected. Connected
* has a different meaning in UDP and means that the channels socket is
* configured to only send and receive from a given remote peer.
*/
if (!channel.isBound()) {
cleanUpWriteBuffer(channel);
return;
}
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
return;
}
write0(channel);
}
@Override
protected void write0(final AbstractNioChannel<?> channel) {
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
// inform the channel that write is in-progress
channel.inWriteNowLoop = true;
// loop forever...
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SocketSendBufferPool.SendBuffer buf;
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
try {
long localWrittenBytes = 0;
SocketAddress raddr = evt.getRemoteAddress();
if (raddr == null) {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
} else {
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.transferTo(ch, raddr);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
}
if (localWrittenBytes > 0 || buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written at all - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
break;
}
} catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (final Throwable t) {
buf.release();
ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
channel.inWriteNowLoop = false;
// Initially, the following block was executed after releasing
// the writeLock, but there was a race condition, and it has to be
// executed before releasing the writeLock:
//
// https://issues.jboss.org/browse/NETTY-410
//
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
Channels.fireWriteComplete(channel, writtenBytes);
}
}

View File

@ -16,10 +16,12 @@
package org.jboss.netty.channel.socket.nio;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
@ -91,4 +93,14 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel>
InetSocketAddress getRemoteSocketAddress() throws Exception {
return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
}
@Override
public ChannelFuture write(Object message, SocketAddress remoteAddress) {
if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
return super.write(message, null);
} else {
return getUnsupportedOperationFuture();
}
}
}

View File

@ -186,6 +186,11 @@ public class NioWorker extends AbstractNioWorker {
channel.setConnected();
future.setSuccess();
}
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
@ -197,10 +202,6 @@ public class NioWorker extends AbstractNioWorker {
}
}
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
}
}

View File

@ -114,13 +114,13 @@ class OioWorker extends AbstractOioWorker<OioSocketChannel> {
a.getBytes(a.readerIndex(), out, length);
}
}
future.setSuccess();
if (iothread) {
fireWriteComplete(channel, length);
} else {
fireWriteCompleteLater(channel, length);
}
future.setSuccess();
} catch (Throwable t) {
// Convert 'SocketException: Socket closed' to

View File

@ -15,13 +15,16 @@
*/
package org.jboss.netty.handler.codec.http.websocketx;
import java.io.UnsupportedEncodingException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.util.CharsetUtil;
/**
* Web Socket Frame for closing the connection
*/
public class CloseWebSocketFrame extends WebSocketFrame {
/**
* Creates a new empty close frame.
*/
@ -29,6 +32,68 @@ public class CloseWebSocketFrame extends WebSocketFrame {
setBinaryData(ChannelBuffers.EMPTY_BUFFER);
}
/**
* Creates a new empty close frame with closing status code and reason text
*
* @param statusCode
* Integer status code as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a>. For
* example, <tt>1000</tt> indicates normal closure.
* @param reasonText
* Reason text. Set to null if no text.
*/
public CloseWebSocketFrame(int statusCode, String reasonText) {
this(true, 0, statusCode, reasonText);
}
/**
* Creates a new close frame with no losing status code and no reason text
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv) {
this(finalFragment, rsv, null);
}
/**
* Creates a new close frame with closing status code and reason text
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param statusCode
* Integer status code as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a>. For
* example, <tt>1000</tt> indicates normal closure.
* @param reasonText
* Reason text. Set to null if no text.
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv, int statusCode, String reasonText) {
setFinalFragment(finalFragment);
setRsv(rsv);
byte[] reasonBytes = new byte[0];
if (reasonText != null) {
try {
reasonBytes = reasonText.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
// This should never happen, anyway provide a fallback here
reasonBytes = reasonText.getBytes();
}
}
ChannelBuffer binaryData = ChannelBuffers.buffer(2 + reasonBytes.length);
binaryData.writeShort(statusCode);
if (reasonBytes.length > 0) {
binaryData.writeBytes(reasonBytes);
}
binaryData.readerIndex(0);
setBinaryData(binaryData);
}
/**
* Creates a new close frame
*
@ -36,10 +101,51 @@ public class CloseWebSocketFrame extends WebSocketFrame {
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame. Must be 2 byte integer followed by optional UTF-8 encoded string.
*/
public CloseWebSocketFrame(boolean finalFragment, int rsv) {
public CloseWebSocketFrame(boolean finalFragment, int rsv, ChannelBuffer binaryData) {
setFinalFragment(finalFragment);
setRsv(rsv);
if (binaryData == null) {
setBinaryData(ChannelBuffers.EMPTY_BUFFER);
} else {
setBinaryData(binaryData);
}
}
/**
* Returns the closing status code as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a>. If
* a status code is set, -1 is returned.
*/
public int getStatusCode() {
ChannelBuffer binaryData = this.getBinaryData();
if (binaryData == null || binaryData.capacity() == 0) {
return -1;
}
binaryData.readerIndex(0);
int statusCode = binaryData.readShort();
binaryData.readerIndex(0);
return statusCode;
}
/**
* Returns the reason text as per <a href="http://tools.ietf.org/html/rfc6455#section-7.4">RFC 6455</a> If a reason
* text is not supplied, an empty string is returned.
*/
public String getReasonText() {
ChannelBuffer binaryData = this.getBinaryData();
if (binaryData == null || binaryData.capacity() <= 2) {
return "";
}
binaryData.readerIndex(2);
String reasonText = binaryData.toString(CharsetUtil.UTF_8);
binaryData.readerIndex(0);
return reasonText;
}
@Override

View File

@ -284,8 +284,9 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
} else if (frameOpcode == OPCODE_PONG) {
return new PongWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
} else if (frameOpcode == OPCODE_CLOSE) {
checkCloseFrameBody(channel, framePayload);
receivedClosingHandshake = true;
return new CloseWebSocketFrame(frameFinalFlag, frameRsv);
return new CloseWebSocketFrame(frameFinalFlag, frameRsv, framePayload);
}
// Processing for possible fragmented messages for text and binary
@ -391,4 +392,38 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
protocolViolation(channel, "invalid UTF-8 bytes");
}
}
protected void checkCloseFrameBody(Channel channel, ChannelBuffer buffer) throws CorruptedFrameException {
if (buffer == null || buffer.capacity() == 0) {
return;
}
if (buffer.capacity() == 1) {
protocolViolation(channel, "Invalid close frame body");
}
// Save reader index
int idx = buffer.readerIndex();
buffer.readerIndex(0);
// Must have 2 byte integer within the valid range
int statusCode = buffer.readShort();
if ((statusCode >= 0 && statusCode <= 999) || (statusCode >= 1004 && statusCode <= 1006)
|| (statusCode >= 1012 && statusCode <= 2999)) {
protocolViolation(channel, "Invalid close frame status code: " + statusCode);
}
// May have UTF-8 message
if (buffer.readableBytes() > 0) {
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
try {
new UTF8Output(b);
} catch (UTF8Exception ex) {
protocolViolation(channel, "Invalid close frame reason text. Invalid UTF-8 bytes");
}
}
// Restore reader index
buffer.readerIndex(idx);
}
}

View File

@ -124,15 +124,16 @@ final class InfBlocks {
int m; // bytes to end of window or read pointer
// copy input/output information to locals (UPDATE macro restores)
p = z.next_in_index;
n = z.avail_in;
b = bitb;
k = bitk;
q = write;
m = q < read? read - q - 1 : end - q;
{
p = z.next_in_index;
n = z.avail_in;
b = bitb;
k = bitk;
}
{
q = write;
m = q < read? read - q - 1 : end - q;
}
// process input based on current state
while (true) {
@ -160,44 +161,51 @@ final class InfBlocks {
switch (t >>> 1) {
case 0: // stored
{
b >>>= 3;
k -= 3;
}
t = k & 7; // go to byte boundary
b >>>= t;
k -= t;
{
b >>>= t;
k -= t;
}
mode = LENS; // get length of stored block
break;
case 1: // fixed
{
int[] bl = new int[1];
int[] bd = new int[1];
int[][] tl = new int[1][];
int[][] td = new int[1][];
InfTree.inflate_trees_fixed(bl, bd, tl, td);
codes.init(bl[0], bd[0], tl[0], 0, td[0], 0);
}
b >>>= 3;
k -= 3;
{
b >>>= 3;
k -= 3;
}
mode = CODES;
break;
case 2: // dynamic
{
b >>>= 3;
k -= 3;
}
mode = TABLE;
break;
case 3: // illegal
{
b >>>= 3;
k -= 3;
}
mode = BAD;
z.msg = "invalid block type";
r = JZlib.Z_DATA_ERROR;
@ -344,9 +352,10 @@ final class InfBlocks {
}
}
b >>>= 14;
k -= 14;
{
b >>>= 14;
k -= 14;
}
index = 0;
mode = BTREE;
@ -371,10 +380,10 @@ final class InfBlocks {
blens[border[index ++]] = b & 7;
b >>>= 3;
k -= 3;
{
b >>>= 3;
k -= 3;
}
}
while (index < 19) {
@ -496,36 +505,36 @@ final class InfBlocks {
}
tb[0] = -1;
int[] bl = new int[1];
int[] bd = new int[1];
int[] tl = new int[1];
int[] td = new int[1];
bl[0] = 9; // must be <= 9 for lookahead assumptions
bd[0] = 6; // must be <= 9 for lookahead assumptions
{
int[] bl = new int[1];
int[] bd = new int[1];
int[] tl = new int[1];
int[] td = new int[1];
bl[0] = 9; // must be <= 9 for lookahead assumptions
bd[0] = 6; // must be <= 9 for lookahead assumptions
t = table;
t = inftree.inflate_trees_dynamic(257 + (t & 0x1f),
1 + (t >> 5 & 0x1f), blens, bl, bd, tl, td, hufts,
z);
t = table;
t = inftree.inflate_trees_dynamic(257 + (t & 0x1f),
1 + (t >> 5 & 0x1f), blens, bl, bd, tl, td, hufts,
z);
if (t != JZlib.Z_OK) {
if (t == JZlib.Z_DATA_ERROR) {
blens = null;
mode = BAD;
if (t != JZlib.Z_OK) {
if (t == JZlib.Z_DATA_ERROR) {
blens = null;
mode = BAD;
}
r = t;
bitb = b;
bitk = k;
z.avail_in = n;
z.total_in += p - z.next_in_index;
z.next_in_index = p;
write = q;
return inflate_flush(z, r);
}
r = t;
bitb = b;
bitk = k;
z.avail_in = n;
z.total_in += p - z.next_in_index;
z.next_in_index = p;
write = q;
return inflate_flush(z, r);
codes.init(bl[0], bd[0], hufts, tl[0], hufts, td[0]);
}
codes.init(bl[0], bd[0], hufts, tl[0], hufts, td[0]);
mode = CODES;
case CODES:
bitb = b;

View File

@ -0,0 +1,157 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.local;
import org.junit.Assert;
import org.junit.Test;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.local.DefaultLocalClientChannelFactory;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
public class LocalAddressTest {
private static String LOCAL_ADDR_ID = "test.id";
@Test
public void localConnectOK()
throws Exception {
ClientBootstrap cb = new ClientBootstrap(new DefaultLocalClientChannelFactory());
ServerBootstrap sb = new ServerBootstrap(new DefaultLocalServerChannelFactory());
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline()
throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("test.handler", new TestHandler());
return pipeline;
}
});
sb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline()
throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("test.handler", new TestHandler());
return pipeline;
}
});
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
// Start server
sb.bind(addr);
// Connect to the server
ChannelFuture connectFuture = cb.connect(addr);
connectFuture.awaitUninterruptibly();
// Send a message event up the pipeline.
Channels.fireMessageReceived(connectFuture.getChannel(), "Hello, World");
// Close the channel
connectFuture.getChannel().close();
// Wait until the connection is closed, or the connection attempt fails
connectFuture.getChannel().getCloseFuture().awaitUninterruptibly();
sb.releaseExternalResources();
cb.releaseExternalResources();
Assert.assertTrue(String.format("Expected null, got channel '%s' for local address '%s'", LocalChannelRegistry.getChannel(addr), addr), LocalChannelRegistry.getChannel(addr) == null);
}
@Test
public void localConnectAgain()
throws Exception {
ClientBootstrap cb = new ClientBootstrap(new DefaultLocalClientChannelFactory());
ServerBootstrap sb = new ServerBootstrap(new DefaultLocalServerChannelFactory());
cb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline()
throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("test.handler", new TestHandler());
return pipeline;
}
});
sb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline()
throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("test.handler", new TestHandler());
return pipeline;
}
});
LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID);
// Start server
sb.bind(addr);
// Connect to the server
ChannelFuture connectFuture = cb.connect(addr);
connectFuture.awaitUninterruptibly();
// Send a message event up the pipeline.
Channels.fireMessageReceived(connectFuture.getChannel(), "Hello, World");
// Close the channel
connectFuture.getChannel().close();
// Wait until the connection is closed, or the connection attempt fails
connectFuture.getChannel().getCloseFuture().awaitUninterruptibly();
sb.releaseExternalResources();
cb.releaseExternalResources();
Assert.assertTrue(String.format("Expected null, got channel '%s' for local address '%s'", LocalChannelRegistry.getChannel(addr), addr), LocalChannelRegistry.getChannel(addr) == null);
}
public static class TestHandler
extends SimpleChannelUpstreamHandler {
public TestHandler() {
}
@Override
public void handleUpstream(ChannelHandlerContext ctx,
ChannelEvent e)
throws Exception {
System.err.println(String.format("Received upstream event '%s'", e));
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import static org.junit.Assert.assertTrue;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractDatagramTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testSimpleSend() throws Throwable {
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
final CountDownLatch latch = new CountDownLatch(1);
sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.messageReceived(ctx, e);
Assert.assertEquals(1,((ChannelBuffer)e.getMessage()).readInt());
latch.countDown();
}
});
cb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler());
Channel sc = sb.bind(new InetSocketAddress("127.0.0.1",0));
Channel cc = cb.bind(new InetSocketAddress("127.0.0.1", 0));
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeInt(1);
cc.write(buf, sc.getLocalAddress());
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
public class NioNioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class NioOioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class OioNioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new NioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory;
public class OioOioDatagramTest extends AbstractDatagramTest{
@Override
protected DatagramChannelFactory newServerSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
@Override
protected DatagramChannelFactory newClientSocketChannelFactory(Executor executor) {
return new OioDatagramChannelFactory(executor);
}
}