Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Jestan Nirojan 2012-09-03 16:53:05 +08:00
commit 359d09bd4d
129 changed files with 3217 additions and 1244 deletions

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty</artifactId>
@ -28,6 +28,31 @@
<name>Netty/All-in-One</name>
<properties>
<quickbuild>true</quickbuild>
</properties>
<profiles>
<profile>
<id>full</id>
<properties>
<quickbuild>false</quickbuild>
</properties>
</profile>
<profile>
<id>sonatype-oss-release</id>
<properties>
<quickbuild>false</quickbuild>
</properties>
</profile>
<profile>
<id>release</id>
<properties>
<quickbuild>false</quickbuild>
</properties>
</profile>
</profiles>
<dependencies>
<!-- The example depends on all modules either directly or transitively -->
<dependency>
@ -212,92 +237,74 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>generate-xref</id>
<phase>package</phase>
<goals>
<goal>jxr</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>${quickbuild}</skip>
<inputEncoding>UTF-8</inputEncoding>
<outputEncoding>UTF-8</outputEncoding>
<linkJavadoc>true</linkJavadoc>
<destDir>${project.build.directory}/xref</destDir>
<javadocDir>${project.build.directory}/api</javadocDir>
<docTitle>Netty Source Xref (${project.version})</docTitle>
<windowTitle>Netty Source Xref (${project.version})</windowTitle>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<skip>${quickbuild}</skip>
<excludePackageNames>*.internal,*.example</excludePackageNames>
<doclet>org.jboss.apiviz.APIviz</doclet>
<docletPath>${project.basedir}/lib/apiviz-1.3.1-jdk7.jar</docletPath>
<docfilessubdirs>true</docfilessubdirs>
<useStandardDocletOptions>true</useStandardDocletOptions>
<outputDirectory>${project.build.directory}/api</outputDirectory>
<overview>${basedir}/src/javadoc/overview.html</overview>
<doctitle>Netty API Reference (${project.version})</doctitle>
<windowtitle>Netty API Reference (${project.version})</windowtitle>
<detectJavaApiLink>false</detectJavaApiLink>
<additionalparam>
-link http://docs.oracle.com/javase/7/docs/api/
-link http://code.google.com/apis/protocolbuffers/docs/reference/java/
-link http://docs.oracle.com/javaee/6/api/
-link http://www.osgi.org/javadoc/r4v43/core/
-link http://www.slf4j.org/apidocs/
-link http://commons.apache.org/logging/commons-logging-1.1.1/apidocs/
-link http://logging.apache.org/log4j/1.2/apidocs/
-group "Low-level data representation" io.netty.buffer*
-group "Central interface for all I/O operations" io.netty.channel*
-group "Client &amp; Server bootstrapping utilities" io.netty.bootstrap*
-group "Reusable I/O event interceptors" io.netty.handler*
-group "Miscellaneous" io.netty.logging*:io.netty.util*
-sourceclasspath ${project.build.outputDirectory}
-nopackagediagram
</additionalparam>
<encoding>UTF-8</encoding>
<locale>en_US</locale>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>full</id>
<build>
<plugins>
<plugin>
<artifactId>maven-jxr-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>generate-xref</id>
<phase>package</phase>
<goals>
<goal>jxr</goal>
</goals>
</execution>
</executions>
<configuration>
<inputEncoding>UTF-8</inputEncoding>
<outputEncoding>UTF-8</outputEncoding>
<linkJavadoc>true</linkJavadoc>
<destDir>${project.build.directory}/xref</destDir>
<javadocDir>${project.build.directory}/api</javadocDir>
<docTitle>Netty Source Xref (${project.version})</docTitle>
<windowTitle>Netty Source Xref (${project.version})</windowTitle>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<attach>false</attach>
<excludePackageNames>*.internal,*.example</excludePackageNames>
<doclet>org.jboss.apiviz.APIviz</doclet>
<docletPath>${project.basedir}/lib/apiviz-1.3.1-jdk7.jar</docletPath>
<docfilessubdirs>true</docfilessubdirs>
<useStandardDocletOptions>true</useStandardDocletOptions>
<outputDirectory>${project.build.directory}/api</outputDirectory>
<reportOutputDirectory>${project.build.directory}</reportOutputDirectory>
<destDir>api</destDir>
<charset>UTF-8</charset>
<docencoding>UTF-8</docencoding>
<breakiterator>true</breakiterator>
<version>false</version>
<author>false</author>
<keywords>true</keywords>
<overview>${basedir}/src/javadoc/overview.html</overview>
<doctitle>Netty API Reference (${project.version})</doctitle>
<windowtitle>Netty API Reference (${project.version})</windowtitle>
<detectJavaApiLink>false</detectJavaApiLink>
<additionalparam>
-link http://docs.oracle.com/javase/7/docs/api/
-link http://code.google.com/apis/protocolbuffers/docs/reference/java/
-link http://docs.oracle.com/javaee/6/api/
-link http://www.osgi.org/javadoc/r4v43/core/
-link http://www.slf4j.org/apidocs/
-link http://commons.apache.org/logging/commons-logging-1.1.1/apidocs/
-link http://logging.apache.org/log4j/1.2/apidocs/
-group "Low-level data representation" io.netty.buffer*
-group "Central interface for all I/O operations" io.netty.channel*
-group "Client &amp; Server bootstrapping utilities" io.netty.bootstrap*
-group "Reusable I/O event interceptors" io.netty.handler*
-group "Miscellaneous" io.netty.logging*:io.netty.util*
-sourceclasspath ${project.build.outputDirectory}
-nopackagediagram
</additionalparam>
<encoding>UTF-8</encoding>
<locale>en_US</locale>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-buffer</artifactId>

View File

@ -176,18 +176,19 @@ public abstract class AbstractByteBuf implements ByteBuf {
@Override
public void ensureWritableBytes(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: 0+)", minWritableBytes));
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (exceeds maxCapacity(%d))", minWritableBytes, maxCapacity));
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d)",
writerIndex, minWritableBytes, maxCapacity));
}
// Normalize the current capacity to the power of 2.
@ -197,6 +198,36 @@ public abstract class AbstractByteBuf implements ByteBuf {
capacity(newCapacity);
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return 0;
}
if (minWritableBytes > maxCapacity - writerIndex) {
if (force) {
if (capacity() == maxCapacity()) {
return 1;
}
capacity(maxCapacity());
return 3;
}
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return 2;
}
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page

View File

@ -443,23 +443,35 @@ public interface ByteBuf extends ChannelBuf, Comparable<ByteBuf> {
* Makes sure the number of {@linkplain #writableBytes() the writable bytes}
* is equal to or greater than the specified value. If there is enough
* writable bytes in this buffer, this method returns with no side effect.
* Otherwise:
* <ul>
* <li>a non-dynamic buffer will throw an {@link IndexOutOfBoundsException}.</li>
* <li>a dynamic buffer will expand its capacity so that the number of the
* {@link #writableBytes() writable bytes} becomes equal to or greater
* than the specified value. The expansion involves the reallocation of
* the internal buffer and consequently memory copy.</li>
* </ul>
* Otherwise, it raises an {@link IllegalArgumentException}.
*
* @param writableBytes
* @param minWritableBytes
* the expected minimum number of writable bytes
* @throws IndexOutOfBoundsException
* if {@linkplain #writableBytes() the writable bytes} of this
* buffer is less than the specified value and if this buffer is
* not a dynamic buffer
* if {@link #writerIndex()} + {@code minWritableBytes} > {@link #maxCapacity()}
*/
void ensureWritableBytes(int writableBytes);
void ensureWritableBytes(int minWritableBytes);
/**
* Tries to make sure the number of {@linkplain #writableBytes() the writable bytes}
* is equal to or greater than the specified value. Unlike {@link #ensureWritableBytes(int)},
* this method does not raise an exception but returns a code.
*
* @param minWritableBytes
* the expected minimum number of writable bytes
* @param force
* When {@link #writerIndex()} + {@code minWritableBytes} > {@link #maxCapacity()}:
* <ul>
* <li>{@code true} - the capacity of the buffer is expanded to {@link #maxCapacity()}</li>
* <li>{@code false} - the capacity of the buffer is unchanged</li>
* </ul>
* @return {@code 0} if the buffer has enough writable bytes, and its capacity is unchanged.
* {@code 1} if the buffer does not have enough bytes, and its capacity is unchanged.
* {@code 2} if the buffer has enough writable bytes, and its capacity has been increased.
* {@code 3} if the buffer does not have enough bytes, but its capacity has been
* increased to its maximum.
*/
int ensureWritableBytes(int minWritableBytes, boolean force);
/**
* Gets a boolean at the specified absolute (@code index) in this buffer.

View File

@ -1166,12 +1166,17 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
}
components.subList(0, firstComponentId).clear();
// Replace the first readable component with a new slice.
// Remove or replace the first readable component with a new slice.
Component c = components.get(0);
int adjustment = readerIndex - c.offset;
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
c.buf.unsafe().release();
components.set(0, newC);
if (adjustment == c.length) {
// new slice would be empty, so remove instead
components.remove(0);
} else {
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
c.buf.unsafe().release();
components.set(0, newC);
}
// Update indexes and markers.
updateComponentOffsets(0);

View File

@ -172,6 +172,11 @@ public class SwappedByteBuf implements WrappedByteBuf {
buf.ensureWritableBytes(writableBytes);
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
return buf.ensureWritableBytes(minWritableBytes, force);
}
@Override
public boolean getBoolean(int index) {
return buf.getBoolean(index);

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-codec-http</artifactId>

View File

@ -94,7 +94,7 @@ public final class CookieDecoder {
String commentURL = null;
String domain = null;
String path = null;
long maxAge = -1;
long maxAge = Long.MIN_VALUE;
List<Integer> ports = new ArrayList<Integer>(2);
for (int j = i + 1; j < names.size(); j++, i++) {

View File

@ -199,11 +199,6 @@ public class DefaultCookie implements Cookie {
@Override
public void setMaxAge(long maxAge) {
if (maxAge < -1) {
throw new IllegalArgumentException(
"maxAge must be either -1, 0, or a positive integer: " +
maxAge);
}
this.maxAge = maxAge;
}

View File

@ -31,7 +31,7 @@ public class DefaultHttpMessage implements HttpMessage {
private final HttpHeaders headers = new HttpHeaders();
private HttpVersion version;
private ByteBuf content = Unpooled.EMPTY_BUFFER;
private boolean chunked;
private HttpTransferEncoding te = HttpTransferEncoding.SINGLE;
/**
* Creates a new instance.
@ -61,19 +61,31 @@ public class DefaultHttpMessage implements HttpMessage {
}
@Override
public boolean isChunked() {
if (chunked) {
return true;
} else {
return HttpCodecUtil.isTransferEncodingChunked(this);
}
public HttpTransferEncoding getTransferEncoding() {
return te;
}
@Override
public void setChunked(boolean chunked) {
this.chunked = chunked;
if (chunked) {
public void setTransferEncoding(HttpTransferEncoding te) {
if (te == null) {
throw new NullPointerException("te (transferEncoding)");
}
this.te = te;
switch (te) {
case SINGLE:
HttpCodecUtil.removeTransferEncodingChunked(this);
break;
case STREAMED:
HttpCodecUtil.removeTransferEncodingChunked(this);
setContent(Unpooled.EMPTY_BUFFER);
break;
case CHUNKED:
if (!HttpCodecUtil.isTransferEncodingChunked(this)) {
addHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
}
removeHeader(HttpHeaders.Names.CONTENT_LENGTH);
setContent(Unpooled.EMPTY_BUFFER);
break;
}
}
@ -87,10 +99,12 @@ public class DefaultHttpMessage implements HttpMessage {
if (content == null) {
content = Unpooled.EMPTY_BUFFER;
}
if (content.readable() && isChunked()) {
if (!getTransferEncoding().isSingle() && content.readable()) {
throw new IllegalArgumentException(
"non-empty content disallowed if this.chunked == true");
"non-empty content disallowed if this.transferEncoding != SINGLE");
}
this.content = content;
}
@ -134,7 +148,11 @@ public class DefaultHttpMessage implements HttpMessage {
@Override
public ByteBuf getContent() {
return content;
if (getTransferEncoding() == HttpTransferEncoding.SINGLE) {
return content;
} else {
return Unpooled.EMPTY_BUFFER;
}
}
@Override
@ -145,8 +163,8 @@ public class DefaultHttpMessage implements HttpMessage {
buf.append(getProtocolVersion().getText());
buf.append(", keepAlive: ");
buf.append(HttpHeaders.isKeepAlive(this));
buf.append(", chunked: ");
buf.append(isChunked());
buf.append(", transferEncoding: ");
buf.append(getTransferEncoding());
buf.append(')');
buf.append(StringUtil.NEWLINE);
appendHeaders(buf);

View File

@ -68,8 +68,8 @@ public class DefaultHttpRequest extends DefaultHttpMessage implements HttpReques
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
buf.append("(chunked: ");
buf.append(isChunked());
buf.append("(transferEncoding: ");
buf.append(getTransferEncoding());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getMethod().toString());

View File

@ -52,8 +52,8 @@ public class DefaultHttpResponse extends DefaultHttpMessage implements HttpRespo
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(getClass().getSimpleName());
buf.append("(chunked: ");
buf.append(isChunked());
buf.append("(transferEncoding: ");
buf.append(getTransferEncoding());
buf.append(')');
buf.append(StringUtil.NEWLINE);
buf.append(getProtocolVersion().getText());

View File

@ -26,7 +26,6 @@ import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
import java.util.List;
import java.util.Map.Entry;
/**
@ -128,29 +127,27 @@ public class HttpChunkAggregator extends MessageToMessageDecoder<Object, HttpMes
ctx.write(CONTINUE.duplicate());
}
if (m.isChunked()) {
// A chunked message - remove 'Transfer-Encoding' header,
// initialize the cumulative buffer, and wait for incoming chunks.
List<String> encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
encodings.remove(HttpHeaders.Values.CHUNKED);
if (encodings.isEmpty()) {
m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
}
m.setChunked(false);
switch (m.getTransferEncoding()) {
case SINGLE:
this.currentMessage = null;
return m;
case STREAMED:
case CHUNKED:
// A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
m.setTransferEncoding(HttpTransferEncoding.SINGLE);
m.setContent(Unpooled.compositeBuffer(maxCumulationBufferComponents));
this.currentMessage = m;
return null;
} else {
// Not a chunked message - pass through.
this.currentMessage = null;
return m;
default:
throw new Error();
}
} else if (msg instanceof HttpChunk) {
// Sanity check
if (currentMessage == null) {
throw new IllegalStateException(
"received " + HttpChunk.class.getSimpleName() +
" without " + HttpMessage.class.getSimpleName());
" without " + HttpMessage.class.getSimpleName() +
" or last message's transfer encoding was 'SINGLE'");
}
// Merge the received chunk into the content of the current message.

View File

@ -92,7 +92,7 @@ public class HttpClientCodec extends CombinedChannelHandler {
if (failOnMissingResponse) {
// check if the request is chunked if so do not increment
if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) {
if (msg instanceof HttpRequest && ((HttpMessage) msg).getTransferEncoding().isSingle()) {
requestResponseCounter.incrementAndGet();
} else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
// increment as its the last chunk
@ -127,8 +127,8 @@ public class HttpClientCodec extends CombinedChannelHandler {
return;
}
// check if its a HttpMessage and its not chunked
if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
// check if it's an HttpMessage and its transfer encoding is SINGLE.
if (msg instanceof HttpMessage && ((HttpMessage) msg).getTransferEncoding().isSingle()) {
requestResponseCounter.decrementAndGet();
} else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
requestResponseCounter.decrementAndGet();

View File

@ -154,7 +154,11 @@ final class HttpCodecUtil {
static void removeTransferEncodingChunked(HttpMessage m) {
List<String> values = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING);
values.remove(HttpHeaders.Values.CHUNKED);
m.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, values);
if (values.isEmpty()) {
m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
} else {
m.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, values);
}
}
static boolean isContentLengthSet(HttpMessage m) {

View File

@ -63,7 +63,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
} else if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg;
decoder = null;
cleanup();
// Determine the content encoding.
String contentEncoding = m.getHeader(HttpHeaders.Names.CONTENT_ENCODING);
@ -73,7 +73,8 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
contentEncoding = HttpHeaders.Values.IDENTITY;
}
boolean hasContent = m.isChunked() || m.getContent().readable();
boolean hasContent =
m.getTransferEncoding().isMultiple() || m.getContent().readable();
if (hasContent && (decoder = newContentDecoder(contentEncoding)) != null) {
// Decode the content and remove or replace the existing headers
// so that the message looks like a decoded message.
@ -81,7 +82,7 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
HttpHeaders.Names.CONTENT_ENCODING,
getTargetContentEncoding(contentEncoding));
if (!m.isChunked()) {
if (m.getTransferEncoding().isSingle()) {
ByteBuf content = m.getContent();
// Decode the content
ByteBuf newContent = Unpooled.buffer();
@ -151,6 +152,25 @@ public abstract class HttpContentDecoder extends MessageToMessageDecoder<Object,
return HttpHeaders.Values.IDENTITY;
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
cleanup();
super.afterRemove(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cleanup();
super.channelInactive(ctx);
}
private void cleanup() {
if (decoder != null) {
// Clean-up the previous decoder if not cleaned up correctly.
finishDecode(Unpooled.buffer());
}
}
private void decode(ByteBuf in, ByteBuf out) {
decoder.writeInbound(in);
fetchDecoderOutput(out);

View File

@ -88,7 +88,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
} else if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg;
encoder = null;
cleanup();
// Determine the content encoding.
String acceptEncoding = acceptEncodingQueue.poll();
@ -96,7 +96,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
throw new IllegalStateException("cannot send more responses than requests");
}
boolean hasContent = m.isChunked() || m.getContent().readable();
boolean hasContent = m.getTransferEncoding().isMultiple() || m.getContent().readable();
if (!hasContent) {
return m;
}
@ -114,7 +114,7 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
HttpHeaders.Names.CONTENT_ENCODING,
result.getTargetContentEncoding());
if (!m.isChunked()) {
if (m.getTransferEncoding().isSingle()) {
ByteBuf content = m.getContent();
// Encode the content.
ByteBuf newContent = Unpooled.buffer();
@ -176,6 +176,26 @@ public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpMessa
*/
protected abstract Result beginEncode(HttpMessage msg, String acceptEncoding) throws Exception;
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
cleanup();
super.afterRemove(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cleanup();
super.channelInactive(ctx);
}
private void cleanup() {
if (encoder != null) {
// Clean-up the previous encoder if not cleaned up correctly.
finishEncode(Unpooled.buffer());
}
}
private void encode(ByteBuf in, ByteBuf out) {
encoder.writeOutbound(in);
fetchEncoderOutput(out);

View File

@ -94,7 +94,8 @@ public interface HttpMessage {
/**
* Returns the content of this {@link HttpMessage}.
*
* If there is no content or {@link #isChunked()} returns {@code true},
* If there is no content or {@link #getTransferEncoding()} returns
* {@link HttpTransferEncoding#STREAMED} or {@link HttpTransferEncoding#CHUNKED},
* an {@link Unpooled#EMPTY_BUFFER} is returned.
*
* @return A {@link ByteBuf} containing this {@link HttpMessage}'s content
@ -171,43 +172,31 @@ public interface HttpMessage {
void clearHeaders();
/**
* Checks to see if this {@link HttpMessage} is broken into multiple "chunks"
*
* If this returns true, it means that this {@link HttpMessage}
* actually has no content - The {@link HttpChunk}s (which are generated
* by the {@link HttpMessageDecoder} consecutively) contain the actual content.
* <p>
* Please note that this method will keep returning {@code true} if the
* {@code "Transfer-Encoding"} of this message is {@code "chunked"}, even if
* you attempt to override this property by calling {@link #setChunked(boolean)}
* with {@code false}.
* </p>
*
* @return True if this message is chunked, otherwise false
* Returns the transfer encoding of this {@link HttpMessage}.
* <ul>
* <li>{@link HttpTransferEncoding#CHUNKED} - an HTTP message whose {@code "Transfer-Encoding"}
* is {@code "chunked"}.</li>
* <li>{@link HttpTransferEncoding#STREAMED} - an HTTP message which is not chunked, but
* is followed by {@link HttpChunk}s that represent its content. {@link #getContent()}
* returns an empty buffer.</li>
* <li>{@link HttpTransferEncoding#SINGLE} - a self-contained HTTP message which is not chunked
* and {@link #getContent()} returns the full content.</li>
* </ul>
*/
boolean isChunked();
HttpTransferEncoding getTransferEncoding();
/**
* Sets the boolean defining if this {@link HttpMessage} is chunked.
*
* <p>
* If this is set to true, it means that this initial {@link HttpMessage}
* does not contain any content - The content is contained by multiple
* {@link HttpChunk}s, which are generated by the {@link HttpMessageDecoder}
* consecutively.
*
* Because of this, the content of this {@link HttpMessage} becomes
* {@link Unpooled#EMPTY_BUFFER}
* </p>
*
* <p>
* Even if this method is called with {@code false}, {@link #isChunked()}
* will keep returning {@code true} if the {@code "Transfer-Encoding"} of
* this message is {@code "chunked"}.
* </p>
*
* @param chunked True if this message is to be delivered in chunks,
* otherwise false.
* Sets the transfer encoding of this {@link HttpMessage}.
* <ul>
* <li>If set to {@link HttpTransferEncoding#CHUNKED}, the {@code "Transfer-Encoding: chunked"}
* header is set and the {@code "Content-Length"} header and the content of this message are
* removed automatically.</li>
* <li>If set to {@link HttpTransferEncoding#STREAMED}, the {@code "Transfer-Encoding: chunked"}
* header and the content of this message are removed automatically.</li>
* <li>If set to {@link HttpTransferEncoding#SINGLE}, the {@code "Transfer-Encoding: chunked"}
* header is removed automatically.</li>
* </ul>
* For more information about what {@link HttpTransferEncoding} means, see {@link #getTransferEncoding()}.
*/
void setChunked(boolean chunked);
void setTransferEncoding(HttpTransferEncoding te);
}

View File

@ -191,15 +191,10 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
State nextState = readHeaders(buffer);
checkpoint(nextState);
if (nextState == State.READ_CHUNK_SIZE) {
// Chunked encoding
message.setChunked(true);
// Generate HttpMessage first. HttpChunks will follow.
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
return message;
} else if (nextState == State.SKIP_CONTROL_CHARS) {
// No content is expected.
// Remove the headers which are not supposed to be present not
// to confuse subsequent handlers.
message.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
return message;
} else {
long contentLength = HttpHeaders.getContentLength(message, -1);
@ -213,7 +208,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
if (contentLength > maxChunkSize || HttpHeaders.is100ContinueExpected(message)) {
// Generate HttpMessage first. HttpChunks will follow.
checkpoint(State.READ_FIXED_LENGTH_CONTENT_AS_CHUNKS);
message.setChunked(true);
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT_AS_CHUNKS
// state reads data chunk by chunk.
chunkSize = HttpHeaders.getContentLength(message, -1);
@ -224,7 +219,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
if (buffer.readableBytes() > maxChunkSize || HttpHeaders.is100ContinueExpected(message)) {
// Generate HttpMessage first. HttpChunks will follow.
checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS);
message.setChunked(true);
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
return message;
}
break;
@ -240,8 +235,9 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
if (toRead > maxChunkSize) {
toRead = maxChunkSize;
}
if (!message.isChunked()) {
message.setChunked(true);
if (message.getTransferEncoding() != HttpTransferEncoding.STREAMED) {
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
return new Object[] {message, new DefaultHttpChunk(buffer.readBytes(toRead))};
} else {
return new DefaultHttpChunk(buffer.readBytes(toRead));
@ -464,8 +460,8 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
}
contentRead += toRead;
if (length < contentRead) {
if (!message.isChunked()) {
message.setChunked(true);
if (message.getTransferEncoding() != HttpTransferEncoding.STREAMED) {
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
return new Object[] {message, new DefaultHttpChunk(read(buffer, toRead))};
} else {
return new DefaultHttpChunk(read(buffer, toRead));
@ -533,14 +529,10 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
State nextState;
if (isContentAlwaysEmpty(message)) {
message.setTransferEncoding(HttpTransferEncoding.SINGLE);
nextState = State.SKIP_CONTROL_CHARS;
} else if (message.isChunked()) {
// HttpMessage.isChunked() returns true when either:
// 1) HttpMessage.setChunked(true) was called or
// 2) 'Transfer-Encoding' is 'chunked'.
// Because this decoder did not call HttpMessage.setChunked(true)
// yet, HttpMessage.isChunked() should return true only when
// 'Transfer-Encoding' is 'chunked'.
} else if (HttpCodecUtil.isTransferEncodingChunked(message)) {
message.setTransferEncoding(HttpTransferEncoding.CHUNKED);
nextState = State.READ_CHUNK_SIZE;
} else if (HttpHeaders.getContentLength(message, -1) >= 0) {
nextState = State.READ_FIXED_LENGTH_CONTENT;

View File

@ -21,11 +21,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.UnsupportedMessageTypeException;
import io.netty.handler.codec.http.HttpHeaders.Names;
import io.netty.handler.codec.http.HttpHeaders.Values;
import io.netty.util.CharsetUtil;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
@ -47,7 +44,7 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
private static final ByteBuf LAST_CHUNK =
copiedBuffer("0\r\n\r\n", CharsetUtil.US_ASCII);
private boolean transferEncodingChunked;
private HttpTransferEncoding lastTE;
/**
* Creates a new instance.
@ -64,26 +61,14 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (msg instanceof HttpMessage) {
HttpMessage m = (HttpMessage) msg;
boolean contentMustBeEmpty;
if (m.isChunked()) {
// if Content-Length is set then the message can't be HTTP chunked
if (HttpCodecUtil.isContentLengthSet(m)) {
contentMustBeEmpty = false;
transferEncodingChunked = false;
HttpCodecUtil.removeTransferEncodingChunked(m);
} else {
// check if the Transfer-Encoding is set to chunked already.
// if not add the header to the message
if (!HttpCodecUtil.isTransferEncodingChunked(m)) {
m.addHeader(Names.TRANSFER_ENCODING, Values.CHUNKED);
}
contentMustBeEmpty = true;
transferEncodingChunked = true;
}
} else {
transferEncodingChunked = contentMustBeEmpty = HttpCodecUtil.isTransferEncodingChunked(m);
}
HttpTransferEncoding te = m.getTransferEncoding();
lastTE = te;
// Calling setTransferEncoding() will sanitize the headers and the content.
// For example, it will remove the cases such as 'Transfer-Encoding' and 'Content-Length'
// coexist. It also removes the content if the transferEncoding is not SINGLE.
m.setTransferEncoding(te);
// Encode the message.
out.markWriterIndex();
encodeInitialLine(out, m);
encodeHeaders(out, m);
@ -91,20 +76,25 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
out.writeByte(LF);
ByteBuf content = m.getContent();
if (content.readable()) {
if (contentMustBeEmpty) {
out.resetWriterIndex();
throw new IllegalArgumentException(
"HttpMessage.content must be empty if Transfer-Encoding is chunked.");
} else {
out.writeBytes(content, content.readerIndex(), content.readableBytes());
}
}
out.writeBytes(content, content.readerIndex(), content.readableBytes());
} else if (msg instanceof HttpChunk) {
HttpChunk chunk = (HttpChunk) msg;
if (transferEncodingChunked) {
HttpTransferEncoding te = lastTE;
if (te == null) {
throw new IllegalArgumentException("HttpChunk must follow an HttpMessage.");
}
switch (te) {
case SINGLE:
throw new IllegalArgumentException(
"The transfer encoding of the last encoded HttpMessage is SINGLE.");
case STREAMED: {
ByteBuf content = chunk.getContent();
out.writeBytes(content, content.readerIndex(), content.readableBytes());
break;
}
case CHUNKED:
if (chunk.isLast()) {
transferEncodingChunked = false;
if (chunk instanceof HttpChunkTrailer) {
out.writeByte((byte) '0');
out.writeByte(CR);
@ -125,11 +115,6 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
out.writeByte(CR);
out.writeByte(LF);
}
} else {
if (!chunk.isLast()) {
ByteBuf chunkContent = chunk.getContent();
out.writeBytes(chunkContent, chunkContent.readerIndex(), chunkContent.readableBytes());
}
}
} else {
throw new UnsupportedMessageTypeException(msg, HttpMessage.class, HttpChunk.class);
@ -137,27 +122,18 @@ public abstract class HttpMessageEncoder extends MessageToByteEncoder<Object> {
}
private static void encodeHeaders(ByteBuf buf, HttpMessage message) {
try {
for (Map.Entry<String, String> h: message.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue());
}
} catch (UnsupportedEncodingException e) {
throw (Error) new Error().initCause(e);
for (Map.Entry<String, String> h: message.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue());
}
}
private static void encodeTrailingHeaders(ByteBuf buf, HttpChunkTrailer trailer) {
try {
for (Map.Entry<String, String> h: trailer.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue());
}
} catch (UnsupportedEncodingException e) {
throw (Error) new Error().initCause(e);
for (Map.Entry<String, String> h: trailer.getHeaders()) {
encodeHeader(buf, h.getKey(), h.getValue());
}
}
private static void encodeHeader(ByteBuf buf, String header, String value)
throws UnsupportedEncodingException {
private static void encodeHeader(ByteBuf buf, String header, String value) {
buf.writeBytes(header.getBytes(CharsetUtil.US_ASCII));
buf.writeByte(COLON);
buf.writeByte(SP);

View File

@ -0,0 +1,78 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.http;
/**
* Represents how an HTTP request or an HTTP response is represented as an {@link HttpMessage}
* and zero or more {@link HttpChunk}s.
*/
public enum HttpTransferEncoding {
/**
* An HTTP message whose transfer encoding is {@code chunked} as defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6">the section 3.6 of
* RFC2616</a> so that the content is split into multiple chunks. A complete HTTP message is
* composed of the following:
* <ol>
* <li>{@link HttpRequest} or {@link HttpResponse} with empty content</li>
* <li>A list of {@link HttpChunk}s whose content are not empty</li>
* <li>{@link HttpChunkTrailer}</li>
* </ol>
*/
CHUNKED(false),
/**
* An HTTP message whose transfer encoding is <strong>not</strong> {@code chunked}, but
* the length of its content is large enough so that the content is split into multiple
* chunks. A complete HTTP message is composted of the following.
* <ol>
* <li>{@link HttpRequest} or {@link HttpResponse} with empty content</li>
* <li>A list of {@link HttpChunk}s whose content are not empty</li>
* <li>{@link HttpChunkTrailer}</li>
* </ol>
* The difference from {@link #CHUNKED} is that the transfer encoding of the streamed content
* is <strong>not</strong> {@code chunked}, and thus {@link HttpMessageEncoder} will
* encode the content as-is, rather than prepending HTTP chunk headers as defined in
* <a href="http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6">the section 3.6 of
* RFC2616</a>.
*/
STREAMED(false),
/**
* A self-contained HTTP message which is not followed by any {@link HttpChunk}s.
* A user can set the content of the message via {@link HttpMessage#setContent(io.netty.buffer.ByteBuf)}.
*/
SINGLE(true);
private final boolean single;
private HttpTransferEncoding(boolean single) {
this.single = single;
}
/**
* Returns {@code true} if and only if a complete HTTP message is composed of an
* {@link HttpMessage} and one or more {@link HttpChunk}s.
*/
public boolean isMultiple() {
return !single;
}
/**
* Returns {@code true} if and only if a single {@link HttpMessage} represents a complete
* HTTP message, not followed by any {@link HttpChunk}s.
*/
public boolean isSingle() {
return single;
}
}

View File

@ -218,7 +218,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
private SpdySynStreamFrame createSynStreamFrame(HttpMessage httpMessage)
throws Exception {
boolean chunked = httpMessage.isChunked();
boolean chunked = httpMessage.getTransferEncoding().isMultiple();
// Get the Stream-ID, Associated-To-Stream-ID, Priority, URL, and scheme from the headers
int streamID = SpdyHttpHeaders.getStreamId(httpMessage);
@ -286,7 +286,7 @@ public class SpdyHttpEncoder extends MessageToMessageEncoder<Object, Object> {
private SpdySynReplyFrame createSynReplyFrame(HttpResponse httpResponse)
throws Exception {
boolean chunked = httpResponse.isChunked();
boolean chunked = httpResponse.getTransferEncoding().isMultiple();
// Get the Stream-ID from the headers
int streamID = SpdyHttpHeaders.getStreamId(httpResponse);

View File

@ -203,7 +203,7 @@ public class CookieDecoderTest {
assertNull(c.getCommentUrl());
assertNull(c.getDomain());
assertTrue(c.getPorts().isEmpty());
assertEquals(-1, c.getMaxAge());
assertEquals(Long.MIN_VALUE, c.getMaxAge());
c = it.next();
assertEquals(1, c.getVersion());
@ -214,7 +214,7 @@ public class CookieDecoderTest {
assertNull(c.getCommentUrl());
assertNull(c.getDomain());
assertTrue(c.getPorts().isEmpty());
assertEquals(-1, c.getMaxAge());
assertEquals(Long.MIN_VALUE, c.getMaxAge());
assertFalse(it.hasNext());
}
@ -239,7 +239,7 @@ public class CookieDecoderTest {
assertNull(c.getCommentUrl());
assertNull(c.getDomain());
assertTrue(c.getPorts().isEmpty());
assertEquals(-1, c.getMaxAge());
assertEquals(Long.MIN_VALUE, c.getMaxAge());
assertTrue(it.hasNext());
c = it.next();
@ -251,7 +251,7 @@ public class CookieDecoderTest {
assertNull(c.getComment());
assertNull(c.getCommentUrl());
assertTrue(c.getPorts().isEmpty());
assertEquals(-1, c.getMaxAge());
assertEquals(Long.MIN_VALUE, c.getMaxAge());
assertFalse(it.hasNext());
}

View File

@ -38,7 +38,7 @@ public class HttpChunkAggregatorTest {
HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1);
HttpHeaders.setHeader(message, "X-Test", true);
message.setChunked(true);
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
HttpChunk chunk3 = new DefaultHttpChunk(Unpooled.EMPTY_BUFFER);
@ -59,7 +59,7 @@ public class HttpChunkAggregatorTest {
}
private void checkContentBuffer(HttpMessage aggregatedMessage) {
private static void checkContentBuffer(HttpMessage aggregatedMessage) {
CompositeByteBuf buffer = (CompositeByteBuf) aggregatedMessage.getContent();
assertEquals(2, buffer.numComponents());
List<ByteBuf> buffers = buffer.decompose(0, buffer.capacity());
@ -76,7 +76,7 @@ public class HttpChunkAggregatorTest {
EmbeddedMessageChannel embedder = new EmbeddedMessageChannel(aggr);
HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1);
HttpHeaders.setHeader(message, "X-Test", true);
message.setChunked(true);
message.setTransferEncoding(HttpTransferEncoding.CHUNKED);
HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
HttpChunkTrailer trailer = new DefaultHttpChunkTrailer();
@ -107,7 +107,7 @@ public class HttpChunkAggregatorTest {
HttpChunkAggregator aggr = new HttpChunkAggregator(4);
EmbeddedMessageChannel embedder = new EmbeddedMessageChannel(aggr);
HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1);
message.setChunked(true);
message.setTransferEncoding(HttpTransferEncoding.STREAMED);
HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
assertFalse(embedder.writeInbound(message));

View File

@ -24,7 +24,7 @@ import org.junit.Assert;
import org.junit.Test;
public class HttpServerCodecTest {
/**
* Testcase for https://github.com/netty/netty/issues/433
*/
@ -45,8 +45,7 @@ public class HttpServerCodecTest {
decoderEmbedder.finish();
HttpMessage httpMessage = (HttpMessage) decoderEmbedder.readInbound();
Assert.assertTrue(httpMessage.isChunked());
Assert.assertSame(HttpTransferEncoding.STREAMED, httpMessage.getTransferEncoding());
boolean empty = true;
int totalBytesPolled = 0;
@ -63,7 +62,7 @@ public class HttpServerCodecTest {
Assert.assertEquals(offeredContentLength, totalBytesPolled);
}
private ByteBuf prepareDataChunk(int size) {
private static ByteBuf prepareDataChunk(int size) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < size; ++i) {
sb.append("a");

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-codec</artifactId>

View File

@ -27,7 +27,6 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
ByteBuf in = ctx.outboundByteBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer();
int oldOutSize = out.readableBytes();
while (in.readable()) {
int oldInSize = in.readableBytes();
try {

View File

@ -17,6 +17,8 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.NoSuchBufferException;
final class CodecUtil {
@ -72,7 +74,12 @@ final class CodecUtil {
}
}
throw new NoSuchBufferException();
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s which accepts a %s.",
ctx.name(),
inbound? ChannelInboundHandler.class.getSimpleName()
: ChannelOutboundHandler.class.getSimpleName(),
msg.getClass().getSimpleName()));
}
private CodecUtil() {

View File

@ -139,6 +139,11 @@ class ReplayingDecoderBuffer implements ByteBuf {
throw new UnreplayableOperationException();
}
@Override
public int ensureWritableBytes(int minWritableBytes, boolean force) {
throw new UnreplayableOperationException();
}
@Override
public ByteBuf duplicate() {
throw new UnreplayableOperationException();

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-common</artifactId>

View File

@ -103,13 +103,7 @@ public class UniqueName implements Comparable<UniqueName> {
return returnCode;
}
if (id < other.id) {
return -1;
} else if (id > other.id) {
return 1;
} else {
return 0;
}
return ((Integer) id).compareTo(other.id);
}
@Override

View File

@ -1,73 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
final class AtomicFieldUpdaterUtil {
private static final boolean AVAILABLE;
static final class Node {
volatile Node next;
}
static {
boolean available = false;
try {
AtomicReferenceFieldUpdater<Node, Node> tmp =
AtomicReferenceFieldUpdater.newUpdater(
Node.class, Node.class, "next");
// Test if AtomicReferenceFieldUpdater is really working.
Node testNode = new Node();
tmp.set(testNode, testNode);
if (testNode.next != testNode) {
// Not set as expected - fall back to the safe mode.
throw new Exception();
}
available = true;
} catch (Throwable t) {
// Running in a restricted environment with a security manager.
}
AVAILABLE = available;
}
static <T, V> AtomicReferenceFieldUpdater<T, V> newRefUpdater(Class<T> tclass, Class<V> vclass, String fieldName) {
if (AVAILABLE) {
return AtomicReferenceFieldUpdater.newUpdater(tclass, vclass, fieldName);
} else {
return null;
}
}
static <T> AtomicIntegerFieldUpdater<T> newIntUpdater(Class<T> tclass, String fieldName) {
if (AVAILABLE) {
return AtomicIntegerFieldUpdater.newUpdater(tclass, fieldName);
} else {
return null;
}
}
static boolean isAvailable() {
return AVAILABLE;
}
private AtomicFieldUpdaterUtil() {
// Unused
}
}

View File

@ -15,11 +15,13 @@
*/
package io.netty.util.internal;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Deflater;
/**
@ -35,11 +37,47 @@ public final class DetectionUtil {
private static final int JAVA_VERSION = javaVersion0();
private static final boolean HAS_UNSAFE = hasUnsafe(AtomicInteger.class.getClassLoader());
private static final boolean IS_WINDOWS;
private static final boolean IS_ROOT;
static {
String os = System.getProperty("os.name").toLowerCase();
String os = SystemPropertyUtil.get("os.name").toLowerCase();
// windows
IS_WINDOWS = os.contains("win");
boolean root = false;
if (!IS_WINDOWS) {
for (int i = 1023; i > 0; i --) {
ServerSocket ss = null;
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
ss.bind(new InetSocketAddress(i));
root = true;
break;
} catch (Exception e) {
// Failed to bind.
// Check the error message so that we don't always need to bind 1023 times.
String message = e.getMessage();
if (message == null) {
message = "";
}
message = message.toLowerCase();
if (message.matches(".*permission.*denied.*")) {
break;
}
} finally {
if (ss != null) {
try {
ss.close();
} catch (Exception e) {
// Ignore.
}
}
}
}
}
IS_ROOT = root;
}
/**
@ -49,6 +87,14 @@ public final class DetectionUtil {
return IS_WINDOWS;
}
/**
* Return {@code true} if the current user is root. Note that this method returns
* {@code false} if on Windows.
*/
public static boolean isRoot() {
return IS_ROOT;
}
public static boolean hasUnsafe() {
return HAS_UNSAFE;
}
@ -58,17 +104,20 @@ public final class DetectionUtil {
}
private static boolean hasUnsafe(ClassLoader loader) {
String value = SystemPropertyUtil.get("io.netty.noUnsafe");
if (value != null) {
boolean noUnsafe = SystemPropertyUtil.getBoolean("io.netty.noUnsafe", false);
if (noUnsafe) {
return false;
}
// Legacy properties
value = SystemPropertyUtil.get("io.netty.tryUnsafe");
if (value == null) {
value = SystemPropertyUtil.get("org.jboss.netty.tryUnsafe", "true");
boolean tryUnsafe = false;
if (SystemPropertyUtil.contains("io.netty.tryUnsafe")) {
tryUnsafe = SystemPropertyUtil.getBoolean("io.netty.tryUnsafe", true);
} else {
tryUnsafe = SystemPropertyUtil.getBoolean("org.jboss.netty.tryUnsafe", true);
}
if (!"true".equalsIgnoreCase(value)) {
if (!tryUnsafe) {
return false;
}
@ -78,6 +127,7 @@ public final class DetectionUtil {
} catch (Exception e) {
// Ignore
}
return false;
}
@ -101,7 +151,9 @@ public final class DetectionUtil {
}
try {
Deflater.class.getDeclaredField("SYNC_FLUSH");
Class.forName(
"java.util.concurrent.LinkedTransferQueue", false,
BlockingQueue.class.getClassLoader());
return 7;
} catch (Exception e) {
// Ignore

View File

@ -15,27 +15,65 @@
*/
package io.netty.util.internal;
import java.util.regex.Pattern;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.Properties;
/**
* Accesses the system property swallowing a {@link SecurityException}.
* A collection of utility methods to retrieve and parse the values of the Java system properties.
*/
final class SystemPropertyUtil {
public final class SystemPropertyUtil {
private static final Properties props = new Properties();
private static final InternalLogger logger;
// Retrieve all system properties at once so that there's no need to deal with
// security exceptions from next time. Otherwise, we might end up with logging every
// security exceptions on every system property access or introducing more complexity
// just because of less verbose logging.
static {
refresh();
logger = InternalLoggerFactory.getInstance(SystemPropertyUtil.class);
}
/**
* Re-retrieves all system properties so that any post-launch properties updates are retrieved.
*/
public static void refresh() {
Properties newProps = null;
try {
newProps = System.getProperties();
} catch (SecurityException e) {
logger.warn("Unable to retrieve the system properties; default values will be used.", e);
newProps = new Properties();
}
synchronized (props) {
props.clear();
props.putAll(newProps);
}
}
/**
* Returns {@code true} if and only if the system property with the specified {@code key}
* exists.
*/
public static boolean contains(String key) {
if (key == null) {
throw new NullPointerException("key");
}
return props.containsKey(key);
}
/**
* Returns the value of the Java system property with the specified
* {@code key}.
* {@code key}, while falling back to {@code null} if the property access fails.
*
* @return the property value.
* {@code null} if there's no such property or if an access to the
* specified property is not allowed.
* @return the property value or {@code null}
*/
public static String get(String key) {
try {
return System.getProperty(key);
} catch (Exception e) {
return null;
}
return get(key, null);
}
/**
@ -48,10 +86,15 @@ final class SystemPropertyUtil {
* specified property is not allowed.
*/
public static String get(String key, String def) {
String value = get(key);
if (value == null) {
value = def;
if (key == null) {
throw new NullPointerException("key");
}
String value = props.getProperty(key);
if (value == null) {
return def;
}
return value;
}
@ -64,17 +107,104 @@ final class SystemPropertyUtil {
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static int get(String key, int def) {
String value = get(key);
public static boolean getBoolean(String key, boolean def) {
if (key == null) {
throw new NullPointerException("key");
}
String value = props.getProperty(key);
if (value == null) {
return def;
}
if (Pattern.matches("-?[0-9]+", value)) {
return Integer.parseInt(value);
} else {
value = value.trim().toLowerCase();
if (value.length() == 0) {
return true;
}
if (value.equals("true") || value.equals("yes") || value.equals("1")) {
return true;
}
if (value.equals("false") || value.equals("no") || value.equals("0")) {
return false;
}
logger.warn(
"Unable to parse the boolean system property '" + key + "':" + value + " - " +
"using the default value: " + def);
return def;
}
/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static int getInt(String key, int def) {
if (key == null) {
throw new NullPointerException("key");
}
String value = props.getProperty(key);
if (value == null) {
return def;
}
value = value.trim().toLowerCase();
if (value.matches("-?[0-9]+")) {
try {
return Integer.parseInt(value);
} catch (Exception e) {
// Ignore
}
}
logger.warn(
"Unable to parse the integer system property '" + key + "':" + value + " - " +
"using the default value: " + def);
return def;
}
/**
* Returns the value of the Java system property with the specified
* {@code key}, while falling back to the specified default value if
* the property access fails.
*
* @return the property value.
* {@code def} if there's no such property or if an access to the
* specified property is not allowed.
*/
public static long getLong(String key, long def) {
if (key == null) {
throw new NullPointerException("key");
}
String value = props.getProperty(key);
if (value == null) {
return def;
}
value = value.trim().toLowerCase();
if (value.matches("-?[0-9]+")) {
try {
return Long.parseLong(value);
} catch (Exception e) {
// Ignore
}
}
logger.warn(
"Unable to parse the long integer system property '" + key + "':" + value + " - " +
"using the default value: " + def);
return def;
}
private SystemPropertyUtil() {

View File

@ -1,125 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/licenses/publicdomain
*/
package io.netty.util.internal;
import java.util.Random;
/**
* A random number generator isolated to the current thread. Like the
* global {@link java.util.Random} generator used by the {@link
* java.lang.Math} class, a {@code ThreadLocalRandom} is initialized
* with an internally generated seed that may not otherwise be
* modified. When applicable, use of {@code ThreadLocalRandom} rather
* than shared {@code Random} objects in concurrent programs will
* typically encounter much less overhead and contention. Use of
* {@code ThreadLocalRandom} is particularly appropriate when multiple
* tasks use random numbers in parallel in thread pools.
*
* <p>Usages of this class should typically be of the form:
* {@code ThreadLocalRandom.current().nextX(...)} (where
* {@code X} is {@code Int}, {@code Long}, etc).
* When all usages are of this form, it is never possible to
* accidently share a {@code ThreadLocalRandom} across multiple threads.
*
* <p>This class also provides additional commonly used bounded random
* generation methods.
*
* @since 1.7
*/
final class ThreadLocalRandom extends Random {
// same constants as Random, but must be redeclared because private
private static final long multiplier = 0x5DEECE66DL;
private static final long addend = 0xBL;
private static final long mask = (1L << 48) - 1;
/**
* The random seed. We can't use super.seed.
*/
private long rnd;
/**
* Initialization flag to permit the first and only allowed call
* to setSeed (inside Random constructor) to succeed. We can't
* allow others since it would cause setting seed in one part of a
* program to unintentionally impact other usages by the thread.
*/
private boolean initialized;
// Padding to help avoid memory contention among seed updates in
// different TLRs in the common case that they are located near
// each other.
@SuppressWarnings("unused")
private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
/**
* The actual ThreadLocal
*/
private static final ThreadLocal<ThreadLocalRandom> localRandom =
new ThreadLocal<ThreadLocalRandom>() {
@Override
protected ThreadLocalRandom initialValue() {
return new ThreadLocalRandom();
}
};
/**
* Constructor called only by localRandom.initialValue.
* We rely on the fact that the superclass no-arg constructor
* invokes setSeed exactly once to initialize.
*/
ThreadLocalRandom() {
}
/**
* Returns the current thread's {@code ThreadLocalRandom}.
*
* @return the current thread's {@code ThreadLocalRandom}
*/
static ThreadLocalRandom current() {
return localRandom.get();
}
/**
* Throws {@code UnsupportedOperationException}. Setting seeds in
* this generator is not supported.
*
* @throws UnsupportedOperationException always
*/
@Override
public void setSeed(long seed) {
if (initialized) {
throw new UnsupportedOperationException();
}
initialized = true;
rnd = (seed ^ multiplier) & mask;
}
@Override
protected int next(int bits) {
rnd = rnd * multiplier + addend & mask;
return (int) (rnd >>> 48 - bits);
}
private static final long serialVersionUID = -5851777807851030925L;
}

View File

@ -53,6 +53,20 @@ public class InternalLoggerFactoryTest {
public void shouldReturnWrappedLogger() {
assertNotSame(mock, InternalLoggerFactory.getInstance("mock"));
}
@Test
public void shouldGetInstance() {
InternalLoggerFactory.setDefaultFactory(oldLoggerFactory);
String helloWorld = "Hello, world!";
InternalLogger one = InternalLoggerFactory.getInstance("helloWorld");
InternalLogger two = InternalLoggerFactory.getInstance(helloWorld.getClass());
assertNotNull(one);
assertNotNull(two);
assertNotSame(one, two);
}
@Test
public void testIsTraceEnabled() {

View File

@ -0,0 +1,70 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
public class DefaultAttributeMapTest {
private DefaultAttributeMap map;
@Before
public void setup() {
map = new DefaultAttributeMap();
}
@Test
public void testMapExists() {
assertNotNull(map);
}
@Test
public void testGetSetString() {
AttributeKey<String> key = new AttributeKey<String>("Nothing");
Attribute<String> one = map.attr(key);
assertSame(one, map.attr(key));
one.setIfAbsent("Whoohoo");
assertSame(one.get(), "Whoohoo");
one.setIfAbsent("What");
assertNotSame(one.get(), "What");
one.remove();
assertNull(one.get());
}
@Test
public void testGetSetInt() {
AttributeKey<Integer> key = new AttributeKey<Integer>("Nada");
Attribute<Integer> one = map.attr(key);
assertSame(one, map.attr(key));
one.setIfAbsent(3653);
assertEquals(one.get(), Integer.valueOf(3653));
one.setIfAbsent(1);
assertNotSame(one.get(), 1);
one.remove();
assertNull(one.get());
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import static org.junit.Assert.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.junit.Test;
public class NetworkConstantsTest {
@Test
public void testLocalhost() throws UnknownHostException {
assertNotNull(NetworkConstants.LOCALHOST);
assertSame(NetworkConstants.LOCALHOST, InetAddress.getLocalHost());
}
@Test
public void testLoopback() {
assertNotNull(NetworkConstants.LOOPBACK_IF);
}
}

View File

@ -15,10 +15,11 @@
*/
package io.netty.util;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotSame;
import org.junit.Before;
import org.junit.Test;
@ -29,7 +30,7 @@ public class UniqueNameTest {
* This is set up before each test
*/
private ConcurrentHashMap<String, Boolean> names;
/**
* Registers a {@link UniqueName}
*
@ -42,15 +43,30 @@ public class UniqueNameTest {
@Before
public void initializeTest() {
this.names = new ConcurrentHashMap<String, Boolean>();
names = new ConcurrentHashMap<String, Boolean>();
}
@Test(expected=NullPointerException.class)
public void testCannnotProvideNullMap() {
new UniqueName(null, "Nothing");
}
@Test(expected=NullPointerException.class)
public void testCannotProvideNullName() {
new UniqueName(names, null);
}
@Test
public void testArgsCanBePassed() {
new UniqueName(names, "Argh, matey!", 2, 5, new Object());
}
@Test
public void testRegisteringName() {
registerName("Abcedrian");
assertTrue(this.names.get("Abcedrian"));
assertTrue(this.names.get("Hellyes") == null);
assertTrue(names.get("Abcedrian"));
assertTrue(names.get("Hellyes") == null);
}
@Test
@ -78,10 +94,28 @@ public class UniqueNameTest {
nameList.add(currentName);
for (UniqueName otherName : nameList) {
if (!currentName.name().equals(otherName.name())) {
assertNotSame(currentName.id(), otherName.name());
assertNotSame(currentName, otherName);
assertNotSame(currentName.hashCode(), otherName.hashCode());
assertFalse(currentName.equals(otherName));
assertNotSame(currentName.toString(), otherName.toString());
}
}
}
}
@Test
public void testCompareNames() {
UniqueName one = registerName("One");
UniqueName two = registerName("Two");
ConcurrentHashMap<String, Boolean> mapTwo = new ConcurrentHashMap<String, Boolean>();
UniqueName three = new UniqueName(mapTwo, "One");
assertSame(one.compareTo(one), 0);
assertSame(one.compareTo(two), -5);
assertSame(one.compareTo(three), -1);
assertSame(three.compareTo(one), 1);
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
public class StringUtilTest {
@Test
public void ensureNewlineExists() {
assertNotNull(StringUtil.NEWLINE);
}
}

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-example</artifactId>

View File

@ -44,7 +44,7 @@ public class HttpSnoopClientHandler extends ChannelInboundMessageHandlerAdapter<
System.out.println();
}
if (response.isChunked()) {
if (response.getTransferEncoding().isMultiple()) {
readingChunks = true;
System.out.println("CHUNKED CONTENT {");
} else {

View File

@ -84,7 +84,7 @@ public class HttpSnoopServerHandler extends ChannelInboundMessageHandlerAdapter<
buf.append("\r\n");
}
if (request.isChunked()) {
if (request.getTransferEncoding().isMultiple()) {
readingChunks = true;
} else {
ByteBuf content = request.getContent();

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-handler</artifactId>

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFlushFutureNotifier;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
@ -161,6 +162,7 @@ public class SslHandler
private volatile ChannelHandlerContext ctx;
private final SSLEngine engine;
private final Executor delegatedTaskExecutor;
private final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier();
private final boolean startTls;
private boolean sentFirstMessage;
@ -268,6 +270,7 @@ public class SslHandler
} catch (Exception e) {
future.setFailure(e);
ctx.fireExceptionCaught(e);
ctx.close();
}
}
});
@ -330,7 +333,6 @@ public class SslHandler
closeOutboundAndChannel(ctx, future, false);
}
@Override
public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
final ByteBuf in = ctx.outboundByteBuffer();
@ -347,12 +349,20 @@ public class SslHandler
return;
}
if (ctx.executor() == ctx.channel().eventLoop()) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
} else {
synchronized (flushFutureNotifier) {
flushFutureNotifier.addFlushFuture(future, in.readableBytes());
}
}
boolean unwrapLater = false;
int bytesProduced = 0;
int bytesConsumed = 0;
try {
for (;;) {
SSLEngineResult result = wrap(engine, in, out);
bytesProduced += result.bytesProduced();
bytesConsumed += result.bytesConsumed();
if (result.getStatus() == Status.CLOSED) {
// SSLEngine has been closed already.
// Any further write attempts should be denied.
@ -361,6 +371,8 @@ public class SslHandler
SSLException e = new SSLException("SSLEngine already closed");
future.setFailure(e);
ctx.fireExceptionCaught(e);
flush0(ctx, bytesConsumed, e);
bytesConsumed = 0;
}
break;
} else {
@ -399,10 +411,60 @@ public class SslHandler
throw e;
} finally {
in.unsafe().discardSomeReadBytes();
ctx.flush(future);
flush0(ctx, bytesConsumed);
}
}
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed) {
ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(bytesConsumed, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(bytesConsumed, future);
}
}
}
private void notifyFlushFutures(final int bytesConsumed, ChannelFuture future) {
if (future.isSuccess()) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
flushFutureNotifier.notifyFlushFutures();
} else {
flushFutureNotifier.notifyFlushFutures(future.cause());
}
}
}));
}
private void flush0(final ChannelHandlerContext ctx, final int bytesConsumed, final Throwable cause) {
ChannelFuture flushFuture = ctx.flush(ctx.newFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (ctx.executor() == ctx.channel().eventLoop()) {
notifyFlushFutures(bytesConsumed, cause, future);
} else {
synchronized (flushFutureNotifier) {
notifyFlushFutures(bytesConsumed, cause, future);
}
}
}
private void notifyFlushFutures(int bytesConsumed, Throwable cause, ChannelFuture future) {
flushFutureNotifier.increaseWriteCounter(bytesConsumed);
if (future.isSuccess()) {
flushFutureNotifier.notifyFlushFutures(cause);
} else {
flushFutureNotifier.notifyFlushFutures(cause, future.cause());
}
}
}));
safeClose(ctx, flushFuture, ctx.newFuture());
}
private static SSLEngineResult wrap(SSLEngine engine, ByteBuf in, ByteBuf out) throws SSLException {
ByteBuffer in0 = in.nioBuffer();
for (;;) {
@ -595,7 +657,9 @@ public class SslHandler
NotSslRecordException e = new NotSslRecordException(
"not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
in.skipBytes(in.readableBytes());
throw e;
ctx.fireExceptionCaught(e);
setHandshakeFailure(e);
return;
}
}
@ -719,16 +783,19 @@ public class SslHandler
}
}
if (cause == null) {
cause = new ClosedChannelException();
}
for (;;) {
ChannelFuture f = handshakeFutures.poll();
if (f == null) {
break;
}
if (cause == null) {
cause = new ClosedChannelException();
}
f.setFailure(cause);
}
flush0(ctx, 0, cause);
}
private void closeOutboundAndChannel(
@ -746,26 +813,7 @@ public class SslHandler
ChannelFuture closeNotifyFuture = ctx.newFuture();
flush(ctx, closeNotifyFuture);
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(ctx.channel() + "close_notify write attempt timed out. Force-closing the connection.");
ctx.close(future);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
closeNotifyFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
if (timeoutFuture.cancel(false)) {
ctx.close(future);
}
}
});
safeClose(ctx, closeNotifyFuture, future);
}
@Override
@ -794,11 +842,11 @@ public class SslHandler
// issue and handshake and add a listener to it which will fire an exception event if
// an exception was thrown while doing the handshake
handshake().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
ctx.pipeline().fireExceptionCaught(future.cause());
ctx.close();
} else {
// Send the event upstream after the handshake was completed without an error.
//
@ -813,6 +861,38 @@ public class SslHandler
}
}
private static void safeClose(
final ChannelHandlerContext ctx, ChannelFuture flushFuture,
final ChannelFuture closeFuture) {
if (!ctx.channel().isActive()) {
ctx.close(closeFuture);
return;
}
// Force-close the connection if close_notify is not fully sent in time.
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
logger.warn(
ctx.channel() + " last lssssswrite attempt timed out." +
" Force-closing the connection.");
ctx.close(closeFuture);
}
}, 3, TimeUnit.SECONDS); // FIXME: Magic value
// Close the connection if close_notify is sent in time.
flushFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f)
throws Exception {
timeoutFuture.cancel(false);
if (ctx.channel().isActive()) {
ctx.close(closeFuture);
}
}
});
}
private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
public SSLEngineInboundCloseFuture() {
super(null, true);
@ -842,6 +922,4 @@ public class SslHandler
return false;
}
}
}

16
pom.xml
View File

@ -26,7 +26,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<packaging>pom</packaging>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
<name>Netty</name>
<url>http://netty.io/</url>
@ -351,6 +351,19 @@
<version>2.8.1</version>
<configuration>
<detectOfflineLinks>false</detectOfflineLinks>
<charset>UTF-8</charset>
<docencoding>UTF-8</docencoding>
<breakiterator>true</breakiterator>
<version>false</version>
<author>false</author>
<keywords>true</keywords>
</configuration>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<retryFailedDeploymentCount>10</retryFailedDeploymentCount>
</configuration>
</plugin>
<plugin>
@ -361,7 +374,6 @@
<arguments>-P release,sonatype-oss-release,full</arguments>
<autoVersionSubmodules>true</autoVersionSubmodules>
<tagNameFormat>netty-@{project.version}</tagNameFormat>
<localCheckout>true</localCheckout>
</configuration>
</plugin>
</plugins>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-tarball</artifactId>

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-testsuite</artifactId>
@ -40,5 +40,17 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,66 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation.Factory;
import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetworkConstants;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class AbstractClientSocketTest {
private static final List<Factory<Bootstrap>> COMBO = SocketTestPermutation.clientSocket();
@Rule
public final TestName testName = new TestName();
protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
protected volatile Bootstrap cb;
protected volatile InetSocketAddress addr;
protected void run() throws Throwable {
int i = 0;
for (Factory<Bootstrap> e: COMBO) {
cb = e.newInstance();
addr = new InetSocketAddress(
NetworkConstants.LOCALHOST, TestUtils.getFreePort());
cb.remoteAddress(addr);
logger.info(String.format(
"Running: %s %d of %d", testName.getMethodName(), ++ i, COMBO.size()));
try {
Method m = getClass().getDeclaredMethod(
testName.getMethodName(), Bootstrap.class);
m.invoke(this, cb);
} catch (InvocationTargetException ex) {
throw ex.getCause();
} finally {
cb.shutdown();
}
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.testsuite.transport.socket.SocketTestPermutation.Factory;
import io.netty.testsuite.util.TestUtils;
import io.netty.util.NetworkConstants;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class AbstractServerSocketTest {
private static final List<Factory<ServerBootstrap>> COMBO = SocketTestPermutation.serverSocket();
@Rule
public final TestName testName = new TestName();
protected final InternalLogger logger = InternalLoggerFactory.getInstance(getClass());
protected volatile ServerBootstrap sb;
protected volatile InetSocketAddress addr;
protected void run() throws Throwable {
int i = 0;
for (Factory<ServerBootstrap> e: COMBO) {
sb = e.newInstance();
addr = new InetSocketAddress(
NetworkConstants.LOCALHOST, TestUtils.getFreePort());
sb.localAddress(addr);
logger.info(String.format(
"Running: %s %d of %d", testName.getMethodName(), ++ i, COMBO.size()));
try {
Method m = getClass().getDeclaredMethod(
testName.getMethodName(), ServerBootstrap.class);
m.invoke(this, sb);
} catch (InvocationTargetException ex) {
throw ex.getCause();
} finally {
sb.shutdown();
}
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelOption;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import junit.framework.Assert;
import org.junit.Test;
public class ServerSocketSuspendTest extends AbstractServerSocketTest {
private static final int NUM_CHANNELS = 10;
private static final long TIMEOUT = 3000000000L;
@Test
public void testSuspendAndResumeAccept() throws Throwable {
run();
}
public void testSuspendAndResumeAccept(ServerBootstrap sb) throws Throwable {
AcceptedChannelCounter counter = new AcceptedChannelCounter(NUM_CHANNELS);
sb.option(ChannelOption.SO_BACKLOG, 1);
sb.childHandler(counter);
Channel sc = sb.bind().sync().channel();
sc.pipeline().firstContext().readable(false);
List<Socket> sockets = new ArrayList<Socket>();
try {
long startTime = System.nanoTime();
for (int i = 0; i < NUM_CHANNELS; i ++) {
Socket s = new Socket();
s.connect(addr, 10000);
sockets.add(s);
}
sc.pipeline().firstContext().readable(true);
counter.latch.await();
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime > TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();
}
}
try {
long startTime = System.nanoTime();
for (int i = 0; i < NUM_CHANNELS; i ++) {
Socket s = new Socket();
s.connect(addr, 10000);
sockets.add(s);
}
long endTime = System.nanoTime();
Assert.assertTrue(endTime - startTime < TIMEOUT);
} finally {
for (Socket s: sockets) {
s.close();
}
}
}
@ChannelHandler.Sharable
private final class AcceptedChannelCounter extends ChannelInboundByteHandlerAdapter {
final CountDownLatch latch;
AcceptedChannelCounter(int nChannels) {
latch = new CountDownLatch(nChannels);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// Unused
}
}
}

View File

@ -45,8 +45,21 @@ public class SocketEchoTest extends AbstractSocketTest {
}
public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
testSimpleEcho0(sb, cb, Integer.MAX_VALUE);
}
@Test
public void testSimpleEchoWithBoundedBuffer() throws Throwable {
run();
}
public void testSimpleEchoWithBoundedBuffer(ServerBootstrap sb, Bootstrap cb) throws Throwable {
testSimpleEcho0(sb, cb, 32);
}
private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, int maxInboundBufferSize) throws Throwable {
EchoHandler sh = new EchoHandler(maxInboundBufferSize);
EchoHandler ch = new EchoHandler(maxInboundBufferSize);
sb.childHandler(sh);
cb.handler(ch);
@ -109,11 +122,18 @@ public class SocketEchoTest extends AbstractSocketTest {
}
private static class EchoHandler extends ChannelInboundByteHandlerAdapter {
private final int maxInboundBufferSize;
volatile Channel channel;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
volatile int counter;
EchoHandler() {
EchoHandler(int maxInboundBufferSize) {
this.maxInboundBufferSize = maxInboundBufferSize;
}
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.buffer(0, maxInboundBufferSize);
}
@Override

View File

@ -0,0 +1,140 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInputShutdownEvent;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class SocketShutdownOutputByPeerTest extends AbstractServerSocketTest {
@Test(timeout = 30000)
public void testShutdownOutput() throws Throwable {
run();
}
public void testShutdownOutput(ServerBootstrap sb) throws Throwable {
TestHandler h = new TestHandler();
Socket s = new Socket();
try {
sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync();
s.connect(addr, 10000);
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
s.shutdownOutput();
h.halfClosure.await();
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertTrue(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
assertEquals(1, h.closure.getCount());
assertEquals(1, h.halfClosureCount.intValue());
} finally {
s.close();
}
}
@Test(timeout = 30000)
public void testShutdownOutputWithoutOption() throws Throwable {
run();
}
public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable {
TestHandler h = new TestHandler();
Socket s = new Socket();
try {
sb.childHandler(h).bind().sync();
s.connect(addr, 10000);
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
s.shutdownOutput();
h.closure.await();
assertFalse(h.ch.isOpen());
assertFalse(h.ch.isActive());
assertTrue(h.ch.isInputShutdown());
assertTrue(h.ch.isOutputShutdown());
assertEquals(1, h.halfClosure.getCount());
assertEquals(0, h.halfClosureCount.intValue());
} finally {
s.close();
}
}
private static class TestHandler extends ChannelInboundByteHandlerAdapter {
volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new SynchronousQueue<Byte>();
final CountDownLatch halfClosure = new CountDownLatch(1);
final CountDownLatch closure = new CountDownLatch(1);
final AtomicInteger halfClosureCount = new AtomicInteger();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ch = (SocketChannel) ctx.channel();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
closure.countDown();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
queue.offer(in.readByte());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ChannelInputShutdownEvent) {
halfClosureCount.incrementAndGet();
halfClosure.countDown();
}
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import org.junit.Test;
public class SocketShutdownOutputBySelfTest extends AbstractClientSocketTest {
@Test(timeout = 30000)
public void testShutdownOutput() throws Throwable {
run();
}
public void testShutdownOutput(Bootstrap cb) throws Throwable {
TestHandler h = new TestHandler();
ServerSocket ss = new ServerSocket();
Socket s = null;
try {
ss.bind(addr);
SocketChannel ch = (SocketChannel) cb.handler(h).connect().sync().channel();
assertTrue(ch.isActive());
assertFalse(ch.isOutputShutdown());
s = ss.accept();
ch.write(Unpooled.wrappedBuffer(new byte[] { 1 })).sync();
assertEquals(1, s.getInputStream().read());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertFalse(h.ch.isOutputShutdown());
// Make the connection half-closed and ensure read() returns -1.
ch.shutdownOutput().sync();
assertEquals(-1, s.getInputStream().read());
assertTrue(h.ch.isOpen());
assertTrue(h.ch.isActive());
assertFalse(h.ch.isInputShutdown());
assertTrue(h.ch.isOutputShutdown());
// If half-closed, the peer should be able to write something.
s.getOutputStream().write(1);
assertEquals(1, (int) h.queue.take());
} finally {
if (s != null) {
s.close();
}
ss.close();
}
}
private static class TestHandler extends ChannelInboundByteHandlerAdapter {
volatile SocketChannel ch;
final BlockingQueue<Byte> queue = new SynchronousQueue<Byte>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ch = (SocketChannel) ctx.channel();
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
queue.offer(in.readByte());
}
}
}

View File

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInitializer;
@ -38,6 +39,7 @@ import java.security.Security;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.KeyManagerFactory;
@ -52,6 +54,7 @@ import org.junit.Test;
public class SocketSslEchoTest extends AbstractSocketTest {
private static final int FIRST_MESSAGE_SIZE = 16384;
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
@ -92,9 +95,20 @@ public class SocketSslEchoTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
ChannelFuture hf = cc.pipeline().get(SslHandler.class).handshake();
final ChannelFuture firstByteWriteFuture =
cc.write(Unpooled.wrappedBuffer(data, 0, FIRST_MESSAGE_SIZE));
final AtomicBoolean firstByteWriteFutureDone = new AtomicBoolean();
hf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
firstByteWriteFutureDone.set(firstByteWriteFuture.isDone());
}
});
hf.sync();
for (int i = 0; i < data.length;) {
assertFalse(firstByteWriteFutureDone.get());
for (int i = FIRST_MESSAGE_SIZE; i < data.length;) {
int length = Math.min(random.nextInt(1024 * 64), data.length - i);
cc.write(Unpooled.wrappedBuffer(data, i, length));
i += length;

View File

@ -1,128 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import static org.junit.Assert.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Ignore;
import org.junit.Test;
public class SocketSuspendTest extends AbstractSocketTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
static {
random.nextBytes(data);
}
@Ignore
@Test
public void testSuspendAccept() throws Throwable {
run();
}
public void testSuspendAccept(ServerBootstrap sb, Bootstrap cb) throws Throwable {
ServerHandler handler = new ServerHandler();
GroupHandler sh = new GroupHandler();
GroupHandler ch = new GroupHandler();
sb.handler(handler);
sb.childHandler(sh);
Channel sc = sb.bind().sync().channel();
cb.handler(ch);
cb.connect().sync();
Thread.sleep(1000);
Bootstrap cb2 = currentBootstrap.newInstance();
cb2.handler(ch);
cb2.remoteAddress(addr);
ChannelFuture cf = cb2.connect();
assertFalse(cf.await(2, TimeUnit.SECONDS));
sc.pipeline().context(handler).readable(true);
assertTrue(cf.await(2, TimeUnit.SECONDS));
sh.group.close().awaitUninterruptibly();
ch.group.close().awaitUninterruptibly();
sc.close().sync();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}
if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
throw ch.exception.get();
}
if (sh.exception.get() != null) {
throw sh.exception.get();
}
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class ServerHandler extends ChannelInboundMessageHandlerAdapter<SocketChannel> {
@Override
public void messageReceived(ChannelHandlerContext ctx, SocketChannel msg) throws Exception {
ctx.nextInboundMessageBuffer().add(msg);
ctx.readable(false);
}
}
@ChannelHandler.Sharable
private static class GroupHandler extends ChannelInboundByteHandlerAdapter {
final ChannelGroup group = new DefaultChannelGroup();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
group.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
if (exception.compareAndSet(null, cause)) {
ctx.close();
}
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
in.clear();
}
}
}

View File

@ -41,57 +41,10 @@ final class SocketTestPermutation {
new ArrayList<Entry<Factory<ServerBootstrap>, Factory<Bootstrap>>>();
// Make the list of ServerBootstrap factories.
List<Factory<ServerBootstrap>> sbfs =
new ArrayList<Factory<ServerBootstrap>>();
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new NioEventLoopGroup(), new NioEventLoopGroup()).
channel(new NioServerSocketChannel());
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
AioEventLoopGroup parentGroup = new AioEventLoopGroup();
AioEventLoopGroup childGroup = new AioEventLoopGroup();
return new ServerBootstrap().
group(parentGroup, childGroup).
channel(new AioServerSocketChannel(parentGroup, childGroup));
}
});
sbfs.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new OioEventLoopGroup(), new OioEventLoopGroup()).
channel(new OioServerSocketChannel());
}
});
List<Factory<ServerBootstrap>> sbfs = serverSocket();
// Make the list of Bootstrap factories.
List<Factory<Bootstrap>> cbfs =
new ArrayList<Factory<Bootstrap>>();
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(new NioEventLoopGroup()).channel(new NioSocketChannel());
}
});
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
AioEventLoopGroup loop = new AioEventLoopGroup();
return new Bootstrap().group(loop).channel(new AioSocketChannel(loop));
}
});
cbfs.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioSocketChannel());
}
});
List<Factory<Bootstrap>> cbfs = clientSocket();
// Populate the combinations
for (Factory<ServerBootstrap> sbf: sbfs) {
@ -170,6 +123,65 @@ final class SocketTestPermutation {
return list;
}
static List<Factory<ServerBootstrap>> serverSocket() {
List<Factory<ServerBootstrap>> list = new ArrayList<Factory<ServerBootstrap>>();
// Make the list of ServerBootstrap factories.
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new NioEventLoopGroup(), new NioEventLoopGroup()).
channel(new NioServerSocketChannel());
}
});
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
AioEventLoopGroup parentGroup = new AioEventLoopGroup();
AioEventLoopGroup childGroup = new AioEventLoopGroup();
return new ServerBootstrap().
group(parentGroup, childGroup).
channel(new AioServerSocketChannel(parentGroup, childGroup));
}
});
list.add(new Factory<ServerBootstrap>() {
@Override
public ServerBootstrap newInstance() {
return new ServerBootstrap().
group(new OioEventLoopGroup(), new OioEventLoopGroup()).
channel(new OioServerSocketChannel());
}
});
return list;
}
static List<Factory<Bootstrap>> clientSocket() {
List<Factory<Bootstrap>> list = new ArrayList<Factory<Bootstrap>>();
list.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(new NioEventLoopGroup()).channel(new NioSocketChannel());
}
});
list.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
AioEventLoopGroup loop = new AioEventLoopGroup();
return new Bootstrap().group(loop).channel(new AioSocketChannel(loop));
}
});
list.add(new Factory<Bootstrap>() {
@Override
public Bootstrap newInstance() {
return new Bootstrap().group(new OioEventLoopGroup()).channel(new OioSocketChannel());
}
});
return list;
}
private SocketTestPermutation() {}
interface Factory<T> {

View File

@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.0.0.Alpha2-SNAPSHOT</version>
<version>4.0.0.Alpha4-SNAPSHOT</version>
</parent>
<artifactId>netty-transport</artifactId>

View File

@ -20,12 +20,13 @@ import io.netty.buffer.MessageBuf;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -38,13 +39,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
private static final Random random = new Random();
/**
* Generates a negative unique integer ID. This method generates only
* negative integers to avoid conflicts with user-specified IDs where only
* non-negative integers are allowed.
*/
private static Integer allocateId(Channel channel) {
int idVal = System.identityHashCode(channel);
int idVal = random.nextInt();
if (idVal > 0) {
idVal = -idVal;
} else if (idVal == 0) {
@ -77,14 +80,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private final ChannelFuture voidFuture = new VoidChannelFuture(this);
private final CloseFuture closeFuture = new CloseFuture(this);
protected final ChannelFlushFutureNotifier flushFutureNotifier = new ChannelFlushFutureNotifier();
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
private volatile EventLoop eventLoop;
private volatile boolean registered;
private ClosedChannelException closedChannelException;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private long writeCounter;
private boolean inFlushNow;
private boolean flushNowPending;
@ -311,12 +314,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract Unsafe newUnsafe();
/**
* Returns the {@linkplain System#identityHashCode(Object) identity hash code}
* of this channel.
* Returns the ID of this channel.
*/
@Override
public final int hashCode() {
return System.identityHashCode(this);
return id;
}
/**
@ -450,6 +452,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
boolean wasActive = isActive();
// See: https://github.com/netty/netty/issues/576
if (!DetectionUtil.isWindows() && !DetectionUtil.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
doBind(localAddress);
future.setSuccess();
if (!wasActive && isActive()) {
@ -510,7 +526,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closedChannelException = new ClosedChannelException();
}
notifyFlushFutures(closedChannelException);
flushFutureNotifier.notifyFlushFutures(closedChannelException);
if (wasActive && !isActive()) {
pipeline.fireChannelInactive();
@ -578,14 +594,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
bufSize = ctx.outboundMessageBuffer().size();
}
long checkpoint = writeCounter + bufSize;
if (future instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) future;
cp.flushCheckpoint(checkpoint);
flushCheckpoints.add(cp);
} else {
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future));
}
flushFutureNotifier.addFlushFuture(future, bufSize);
}
if (!inFlushNow) { // Avoid re-entrance
@ -596,7 +605,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Event loop will call flushNow() later by itself.
}
} catch (Throwable t) {
notifyFlushFutures(t);
flushFutureNotifier.notifyFlushFutures(t);
pipeline.fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
@ -643,7 +652,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
final int newSize = out.readableBytes();
final int writtenBytes = oldSize - newSize;
if (writtenBytes > 0) {
writeCounter += writtenBytes;
flushFutureNotifier.increaseWriteCounter(writtenBytes);
if (newSize == 0) {
out.discardReadBytes();
}
@ -657,14 +666,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} catch (Throwable t) {
cause = t;
} finally {
writeCounter += oldSize - out.size();
flushFutureNotifier.increaseWriteCounter(oldSize - out.size());
}
}
if (cause == null) {
notifyFlushFutures();
flushFutureNotifier.notifyFlushFutures();
} else {
notifyFlushFutures(cause);
flushFutureNotifier.notifyFlushFutures(cause);
pipeline.fireExceptionCaught(cause);
if (cause instanceof IOException) {
close(voidFuture());
@ -725,92 +734,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract boolean isFlushPending();
protected void notifyFlushFutures() {
notifyFlushFutures(0);
}
protected void notifyFlushFutures(long writtenBytes) {
writeCounter += writtenBytes;
if (flushCheckpoints.isEmpty()) {
return;
}
final long writeCounter = this.writeCounter;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.peek();
if (cp == null) {
// Reset the counter if there's nothing in the notification list.
this.writeCounter = 0;
break;
}
if (cp.flushCheckpoint() > writeCounter) {
if (writeCounter > 0 && flushCheckpoints.size() == 1) {
this.writeCounter = 0;
cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
}
break;
}
flushCheckpoints.remove();
cp.future().setSuccess();
}
// Avoid overflow
final long newWriteCounter = this.writeCounter;
if (newWriteCounter >= 0x1000000000000000L) {
// Reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
this.writeCounter = 0;
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
}
}
}
protected void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
abstract static class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);
abstract ChannelFuture future();
}
private static class DefaultFlushCheckpoint extends FlushCheckpoint {
private long checkpoint;
private final ChannelFuture future;
DefaultFlushCheckpoint(long checkpoint, ChannelFuture future) {
this.checkpoint = checkpoint;
this.future = future;
}
@Override
long flushCheckpoint() {
return checkpoint;
}
@Override
void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
ChannelFuture future() {
return future;
}
}
private final class CloseFuture extends DefaultChannelFuture implements ChannelFuture.Unsafe {
CloseFuture(AbstractChannel ch) {

View File

@ -44,12 +44,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
@Override
public ByteBuf outboundByteBuffer() {
throw new NoSuchBufferException();
throw new NoSuchBufferException(String.format(
"%s does not have an outbound buffer.", ServerChannel.class.getSimpleName()));
}
@Override
public MessageBuf<Object> outboundMessageBuffer() {
throw new NoSuchBufferException();
throw new NoSuchBufferException(String.format(
"%s does not have an outbound buffer.", ServerChannel.class.getSimpleName()));
}
@Override

View File

@ -113,6 +113,9 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
Integer id();
/**
* Return the {@link EventLoop} this {@link Channel} was registered too.
*/
EventLoop eventLoop();
/**
@ -138,6 +141,9 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
boolean isRegistered();
boolean isActive();
/**
* Return the {@link ChannelMetadata} of the {@link Channel} which describe the nature of the {@link Channel}.
*/
ChannelMetadata metadata();
ByteBuf outboundByteBuffer();
@ -176,25 +182,99 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
*/
ChannelFuture closeFuture();
/**
* <strong>Caution</strong> for transport implementations use only!
*/
Unsafe unsafe();
/**
* <strong>Unsafe</strong> operations that should <strong>never</strong> be called
* from user-code. These methods are only provided to implement the actual transport.
*/
interface Unsafe {
/**
* Return the {@link ChannelHandlerContext} which is directly connected to the outbound of the
* underlying transport.
*/
ChannelHandlerContext directOutboundContext();
/**
* Return a {@link VoidChannelFuture}. This method always return the same instance.
*/
ChannelFuture voidFuture();
/**
* Return the {@link SocketAddress} to which is bound local or
* <code>null</code> if none.
*/
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or
* <code>null</code> if none is bound yet.
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelFuture} with the {@link EventLoop} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(EventLoop eventLoop, ChannelFuture future);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelFuture} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelFuture future);
/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass <code>null</code> to it.
*
* The {@link ChannelFuture} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelFuture future);
/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelFuture} once the
* operation was complete.
*/
void disconnect(ChannelFuture future);
/**
* Close the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelFuture} once the
* operation was complete.
*/
void close(ChannelFuture future);
/**
* Deregister the {@link Channel} of the {@link ChannelFuture} from {@link EventLoop} and notify the
* {@link ChannelFuture} once the operation was complete.
*/
void deregister(ChannelFuture future);
/**
* Flush out all data that was buffered in the buffer of the {@link #directOutboundContext()} and was not
* flushed out yet. After that is done the {@link ChannelFuture} will get notified
*/
void flush(ChannelFuture future);
/**
* Flush out all data now.
*/
void flushNow();
/**
* Suspend reads from the underlying transport, which basicly has the effect of no new data that will
* get dispatched.
*/
void suspendRead();
/**
* Resume reads from the underlying transport. If {@link #suspendRead()} was not called before, this
* has no effect.
*/
void resumeRead();
}
}

View File

@ -60,6 +60,9 @@ import java.util.Map;
*/
public interface ChannelConfig {
/**
* Return all set {@link ChannelOption}'s.
*/
Map<ChannelOption<?>, Object> getOptions();
/**
@ -67,6 +70,9 @@ public interface ChannelConfig {
*/
boolean setOptions(Map<ChannelOption<?>, ?> options);
/**
* Return the value of the given {@link ChannelOption}
*/
<T> T getOption(ChannelOption<T> option);
/**

View File

@ -0,0 +1,140 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.ArrayDeque;
import java.util.Queue;
public class ChannelFlushFutureNotifier {
private long writeCounter;
private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
public void addFlushFuture(ChannelFuture future, int pendingDataSize) {
long checkpoint = writeCounter + pendingDataSize;
if (future instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) future;
cp.flushCheckpoint(checkpoint);
flushCheckpoints.add(cp);
} else {
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future));
}
}
public void increaseWriteCounter(long delta) {
writeCounter += delta;
}
public void notifyFlushFutures() {
notifyFlushFutures0(null);
}
public void notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
}
public void notifyFlushFutures(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1);
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause2);
}
}
private void notifyFlushFutures0(Throwable cause) {
if (flushCheckpoints.isEmpty()) {
writeCounter = 0;
return;
}
final long writeCounter = this.writeCounter;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.peek();
if (cp == null) {
// Reset the counter if there's nothing in the notification list.
this.writeCounter = 0;
break;
}
if (cp.flushCheckpoint() > writeCounter) {
if (writeCounter > 0 && flushCheckpoints.size() == 1) {
this.writeCounter = 0;
cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
}
break;
}
flushCheckpoints.remove();
if (cause == null) {
cp.future().setSuccess();
} else {
cp.future().setFailure(cause);
}
}
// Avoid overflow
final long newWriteCounter = this.writeCounter;
if (newWriteCounter >= 0x1000000000000000L) {
// Reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
this.writeCounter = 0;
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
}
}
}
abstract static class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);
abstract ChannelFuture future();
}
private static class DefaultFlushCheckpoint extends FlushCheckpoint {
private long checkpoint;
private final ChannelFuture future;
DefaultFlushCheckpoint(long checkpoint, ChannelFuture future) {
this.checkpoint = checkpoint;
this.future = future;
}
@Override
long flushCheckpoint() {
return checkpoint;
}
@Override
void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
ChannelFuture future() {
return future;
}
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@ -162,7 +163,7 @@ import java.util.concurrent.TimeUnit;
* @apiviz.landmark
* @apiviz.owns io.netty.channel.ChannelFutureListener - - notifies
*/
public interface ChannelFuture {
public interface ChannelFuture extends Future<Void> {
/**
* Returns a channel where the I/O operation associated with this
@ -175,12 +176,14 @@ public interface ChannelFuture {
* complete, regardless of whether the operation was successful, failed,
* or cancelled.
*/
@Override
boolean isDone();
/**
* Returns {@code true} if and only if this future was
* cancelled by a {@link #cancel()} method.
*/
@Override
boolean isCancelled();
/**

View File

@ -15,8 +15,28 @@
*/
package io.netty.channel;
/**
* Factory which is responsible to create new {@link ChannelFuture}'s
*
*/
public interface ChannelFutureFactory {
/**
* Create a new {@link ChannelFuture}
*/
ChannelFuture newFuture();
/**
* Create a new {@link ChannelFuture} which is marked as successes already. So {@link ChannelFuture#isSuccess()}
* will return <code>true</code>. All {@link ChannelFutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
ChannelFuture newSucceededFuture();
/**
* Create a new {@link ChannelFuture} which is marked as fakued already. So {@link ChannelFuture#isSuccess()}
* will return <code>false</code>. All {@link ChannelFutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
ChannelFuture newFailedFuture(Throwable cause);
}

View File

@ -208,12 +208,34 @@ import java.nio.channels.Channels;
*/
public interface ChannelHandler {
/**
* Gets called before the {@link ChannelHandler} is added to the actual context.
*/
void beforeAdd(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was added to the actual context.
*/
void afterAdd(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called before the {@link ChannelHandler} is removed from the actual context.
*/
void beforeRemove(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context.
*/
void afterRemove(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**

View File

@ -128,34 +128,152 @@ import java.util.Set;
public interface ChannelHandlerContext
extends AttributeMap, ChannelFutureFactory,
ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
*/
Channel channel();
/**
* Return the {@link ChannelPipeline} which belongs this {@link ChannelHandlerContext}.
*/
ChannelPipeline pipeline();
/**
* The {@link EventExecutor} that is used to dispatch the events. This can also be used to directly
* submit tasks that get executed in the event loop. For more informations please refer to the
* {@link EventExecutor} javadocs.
*/
EventExecutor executor();
/**
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered
* {@link ChannelHandler} from the {@link ChannelPipeline}.
*/
String name();
ChannelHandler handler();
Set<ChannelHandlerType> type();
/**
* The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
*/
ChannelHandler handler();
Set<ChannelHandlerType> types();
/**
* Return <code>true</code> if the {@link ChannelHandlerContext} has an {@link ByteBuf} bound for inbound
* which can be used.
*/
boolean hasInboundByteBuffer();
/**
* Return <code>true</code> if the {@link ChannelHandlerContext} has a {@link MessageBuf} bound for inbound
* which can be used.
*/
boolean hasInboundMessageBuffer();
/**
* Return the bound {@link ByteBuf} for inbound data if {@link #hasInboundByteBuffer()} returned
* <code>true</code>. If {@link #hasInboundByteBuffer()} returned <code>false</code> it will throw a
* {@link UnsupportedOperationException}
*/
ByteBuf inboundByteBuffer();
/**
* Return the bound {@link MessageBuf} for inbound data if {@link #hasInboundMessageBuffer()} returned
* <code>true</code>. If {@link #hasInboundMessageBuffer()} returned <code>false</code> it will throw a
* {@link UnsupportedOperationException}.
*/
<T> MessageBuf<T> inboundMessageBuffer();
/**
* Return <code>true</code> if the {@link ChannelHandlerContext} has an {@link ByteBuf} bound for outbound
* data which can be used.
*
*/
boolean hasOutboundByteBuffer();
/**
* Return <code>true</code> if the {@link ChannelHandlerContext} has a {@link MessageBuf} bound for outbound
* which can be used.
*/
boolean hasOutboundMessageBuffer();
/**
* Return the bound {@link ByteBuf} for outbound data if {@link #hasOutboundByteBuffer()} returned
* <code>true</code>. If {@link #hasOutboundByteBuffer()} returned <code>false</code> it will throw
* a {@link UnsupportedOperationException}.
*/
ByteBuf outboundByteBuffer();
/**
* Return the bound {@link MessageBuf} for outbound data if {@link #hasOutboundMessageBuffer()} returned
* <code>true</code>. If {@link #hasOutboundMessageBuffer()} returned <code>false</code> it will throw a
* {@link UnsupportedOperationException}
*/
<T> MessageBuf<T> outboundMessageBuffer();
/**
* Return <code>true</code> if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling
* inbound data.
*/
boolean hasNextInboundByteBuffer();
/**
* Return <code>true</code> if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
* inbound data.
*/
boolean hasNextInboundMessageBuffer();
/**
* Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextInboundByteBuffer()}
* returned <code>true</code>, otherwise a {@link UnsupportedOperationException} is thrown.
*/
ByteBuf nextInboundByteBuffer();
/**
* Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if
* {@link #hasNextInboundMessageBuffer()} returned <code>true</code>, otherwise a
* {@link UnsupportedOperationException} is thrown.
*/
MessageBuf<Object> nextInboundMessageBuffer();
/**
* Return <code>true</code> if the next {@link ChannelHandlerContext} has a {@link ByteBuf} for handling outbound
* data.
*/
boolean hasNextOutboundByteBuffer();
/**
* Return <code>true</code> if the next {@link ChannelHandlerContext} has a {@link MessageBuf} for handling
* outbound data.
*/
boolean hasNextOutboundMessageBuffer();
/**
* Return the {@link ByteBuf} of the next {@link ChannelHandlerContext} if {@link #hasNextOutboundByteBuffer()}
* returned <code>true</code>, otherwise a {@link UnsupportedOperationException} is thrown.
*/
ByteBuf nextOutboundByteBuffer();
/**
* Return the {@link MessageBuf} of the next {@link ChannelHandlerContext} if
* {@link #hasNextOutboundMessageBuffer()} returned <code>true</code>, otherwise a
* {@link UnsupportedOperationException} is thrown.
*/
MessageBuf<Object> nextOutboundMessageBuffer();
/**
* Return <code>true</code> if the {@link ChannelHandlerContext} was marked as readable. This basically means
* that once its not readable anymore no new data will be read from the transport and passed down the
* {@link ChannelPipeline}.
*
* Only if all {@link ChannelHandlerContext}'s {@link #isReadable()} return <code>true</code>, the data is
* passed again down the {@link ChannelPipeline}.
*/
boolean isReadable();
/**
* Mark the {@link ChannelHandlerContext} as readable or suspend it. See {@link #isReadable()}
*/
void readable(boolean readable);
}

View File

@ -15,6 +15,10 @@
*/
package io.netty.channel;
/**
* Define the type of a {@link ChannelHandler}
*
*/
public enum ChannelHandlerType {
STATE(0),
INBOUND(0),
@ -24,6 +28,9 @@ public enum ChannelHandlerType {
final int direction; // 0 - up (inbound), 1 - down (outbound)
ChannelHandlerType(int direction) {
if (direction != 0 && direction != 1) {
throw new IllegalArgumentException("direction must be either 0 or 1");
}
this.direction = direction;
}
}

View File

@ -17,6 +17,10 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
/**
* {@link ChannelInboundHandler} which offers a {@link ByteBuf} to store inbound data in.
*
*/
public interface ChannelInboundByteHandler extends ChannelInboundHandler {
@Override
ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;

View File

@ -19,9 +19,19 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* Abstract base class for {@link ChannelInboundHandlerAdapter} which should be extended by the user to
* get notified once more data is ready to get consumed from the inbound {@link ByteBuf}.
*
* This implementation is a good starting point for must users.
*/
public abstract class ChannelInboundByteHandlerAdapter
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {
/**
* Create a new unpooled {@link ByteBuf} by default. Sub-classes may override this to offer a more
* optimized implementation.
*/
@Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return Unpooled.buffer();
@ -34,10 +44,15 @@ public abstract class ChannelInboundByteHandlerAdapter
inboundBufferUpdated(ctx, in);
} finally {
if (!in.readable()) {
in.discardReadBytes();
in.unsafe().discardSomeReadBytes();
}
}
}
/**
* Callback which will get notifed once the given {@link ByteBuf} received more data to read. What will be done
* with the data at this point is up to the implementation.
* Implementations may choose to read it or just let it in the buffer to read it later.
*/
public abstract void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception;
}

View File

@ -17,6 +17,12 @@ package io.netty.channel;
import io.netty.buffer.ChannelBuf;
/**
* {@link ChannelStateHandler} which handles inbound data.
*/
public interface ChannelInboundHandler extends ChannelStateHandler {
/**
* Return the {@link ChannelBuf} which will be used for inbound data for the given {@link ChannelHandlerContext}.
*/
ChannelBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -18,6 +18,10 @@ package io.netty.channel;
public abstract class ChannelInboundHandlerAdapter
extends ChannelStateHandlerAdapter implements ChannelInboundHandler {
/**
* Does nothing by default. Sub-classes may override this if needed.
*/
@Override
public abstract void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -17,7 +17,14 @@ package io.netty.channel;
import io.netty.buffer.MessageBuf;
/**
* Special {@link ChannelInboundHandler} which store the inbound data in a {@link MessageBuf} for futher processing.
*/
public interface ChannelInboundMessageHandler<I> extends ChannelInboundHandler {
/**
* Return the {@link MessageBuf} which will be used for inbound data to store.
*/
@Override
MessageBuf<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -28,6 +28,8 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
@Override
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
beginMessageReceived(ctx);
MessageBuf<I> in = ctx.inboundMessageBuffer();
for (;;) {
I msg = in.poll();
@ -40,7 +42,11 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
ctx.fireExceptionCaught(t);
}
}
endMessageReceived(ctx);
}
public void beginMessageReceived(ChannelHandlerContext ctx) throws Exception { }
public abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
public void endMessageReceived(ChannelHandlerContext ctx) throws Exception { }
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
public final class ChannelInputShutdownEvent {
public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent();
private ChannelInputShutdownEvent() { }
}

View File

@ -32,6 +32,9 @@ public class ChannelOption<T> extends UniqueName {
new ChannelOption<Integer>("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT =
new ChannelOption<Integer>("WRITE_SPIN_COUNT");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE =
new ChannelOption<Boolean>("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> SO_BROADCAST =
new ChannelOption<Boolean>("SO_BROADCAST");

View File

@ -454,6 +454,13 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelHandler first();
/**
* Returns the context of the first {@link ChannelHandler} in this pipeline.
*
* @return the context of the first handler. {@code null} if this pipeline is empty.
*/
ChannelHandlerContext firstContext();
/**
* Returns the last {@link ChannelHandler} in this pipeline.
*
@ -461,6 +468,13 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI
*/
ChannelHandler last();
/**
* Returns the context of the last {@link ChannelHandler} in this pipeline.
*
* @return the context of the last handler. {@code null} if this pipeline is empty.
*/
ChannelHandlerContext lastContext();
/**
* Returns the {@link ChannelHandler} with the specified name in this
* pipeline.

View File

@ -15,12 +15,23 @@
*/
package io.netty.channel;
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelStateHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* The inbound buffer of the {@link ChannelHandlerContext} was updated with new data.
* This means something may be ready to get processed by the actual {@link ChannelStateHandler}
* implementation. It's up to the implementation to consume it or keep it in the buffer
* to wait for more data and consume it later.
*/
void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}

View File

@ -31,10 +31,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public final class TaskScheduler {
public final class ChannelTaskScheduler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(TaskScheduler.class);
InternalLoggerFactory.getInstance(ChannelTaskScheduler.class);
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
private static final long START_TIME = System.nanoTime();
@ -55,7 +55,7 @@ public final class TaskScheduler {
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
public TaskScheduler(ThreadFactory threadFactory) {
public ChannelTaskScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

View File

@ -121,6 +121,11 @@ public abstract class CompleteChannelFuture implements ChannelFuture {
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;

View File

@ -16,7 +16,7 @@
package io.netty.channel;
import static java.util.concurrent.TimeUnit.*;
import io.netty.channel.AbstractChannel.FlushCheckpoint;
import io.netty.channel.ChannelFlushFutureNotifier.FlushCheckpoint;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
@ -24,7 +24,9 @@ import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* The default {@link ChannelFuture} implementation. It is recommended to
@ -181,6 +183,32 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
return this;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return null;
} else {
throw new ExecutionException(cause);
}
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
if (!await(timeout, unit)) {
throw new TimeoutException();
}
Throwable cause = cause();
if (cause == null) {
return null;
} else {
throw new ExecutionException(cause);
}
}
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
@ -388,6 +416,11 @@ public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFutu
return true;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return cancel();
}
private void notifyListeners() {
// This method doesn't need synchronization because:
// 1) This method is always called after synchronized (this) block.

View File

@ -349,7 +349,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
@Override
public Set<ChannelHandlerType> type() {
public Set<ChannelHandlerType> types() {
return type;
}
@ -366,7 +366,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ByteBuf inboundByteBuffer() {
if (inByteBuf == null) {
throw new NoSuchBufferException();
if (handler instanceof ChannelInboundHandler) {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no inbound byte buffer; it implements %s, but " +
"its newInboundBuffer() method created a %s.",
name, ChannelInboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no inbound byte buffer; it does not implement %s.",
name, ChannelInboundHandler.class.getSimpleName()));
}
}
return inByteBuf;
}
@ -375,7 +385,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@SuppressWarnings("unchecked")
public <T> MessageBuf<T> inboundMessageBuffer() {
if (inMsgBuf == null) {
throw new NoSuchBufferException();
if (handler instanceof ChannelInboundHandler) {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no inbound message buffer; it implements %s, but " +
"its newInboundBuffer() method created a %s.",
name, ChannelInboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no inbound message buffer; it does not implement %s.",
name, ChannelInboundHandler.class.getSimpleName()));
}
}
return (MessageBuf<T>) inMsgBuf;
}
@ -393,7 +413,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override
public ByteBuf outboundByteBuffer() {
if (outByteBuf == null) {
throw new NoSuchBufferException();
if (handler instanceof ChannelOutboundHandler) {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no outbound byte buffer; it implements %s, but " +
"its newOutboundBuffer() method created a %s.",
name, ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no outbound byte buffer; it does not implement %s.",
name, ChannelOutboundHandler.class.getSimpleName()));
}
}
return outByteBuf;
}
@ -402,7 +432,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@SuppressWarnings("unchecked")
public <T> MessageBuf<T> outboundMessageBuffer() {
if (outMsgBuf == null) {
throw new NoSuchBufferException();
if (handler instanceof ChannelOutboundHandler) {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no outbound message buffer; it implements %s, but " +
"its newOutboundBuffer() method created a %s.",
name, ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the handler '%s' has no outbound message buffer; it does not implement %s.",
name, ChannelOutboundHandler.class.getSimpleName()));
}
}
return (MessageBuf<T>) outMsgBuf;
}
@ -451,7 +491,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
if (prev != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose inbound buffer is %s.",
name, ChannelInboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose inbound buffer is %s.",
ChannelInboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
}
}
if (ctx.inByteBuf != null) {
if (ctx.executor().inEventLoop(currentThread)) {
@ -477,7 +527,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
if (prev != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose inbound buffer is %s.",
name, ChannelInboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose inbound buffer is %s.",
ChannelInboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
}
}
if (ctx.inMsgBuf != null) {

View File

@ -18,7 +18,6 @@ package io.netty.channel;
import static io.netty.channel.DefaultChannelHandlerContext.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuf;
import io.netty.buffer.ChannelBufType;
import io.netty.buffer.MessageBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultChannelHandlerContext.ByteBridge;
@ -744,7 +743,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandler first() {
public ChannelHandler first() {
DefaultChannelHandlerContext first = head.next;
if (first == null) {
return null;
@ -753,7 +752,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandler last() {
public ChannelHandlerContext firstContext() {
return head.next;
}
@Override
public ChannelHandler last() {
DefaultChannelHandlerContext last = tail;
if (last == head || last == null) {
return null;
@ -762,8 +766,17 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandler get(String name) {
DefaultChannelHandlerContext ctx = name2ctx.get(name);
public ChannelHandlerContext lastContext() {
DefaultChannelHandlerContext last = tail;
if (last == head || last == null) {
return null;
}
return last;
}
@Override
public ChannelHandler get(String name) {
ChannelHandlerContext ctx = context(name);
if (ctx == null) {
return null;
} else {
@ -772,7 +785,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
public <T extends ChannelHandler> T get(Class<T> handlerType) {
ChannelHandlerContext ctx = context(handlerType);
if (ctx == null) {
return null;
@ -782,93 +795,78 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
@Override
public synchronized ChannelHandlerContext context(String name) {
public ChannelHandlerContext context(String name) {
if (name == null) {
throw new NullPointerException("name");
}
return name2ctx.get(name);
synchronized (this) {
return name2ctx.get(name);
}
}
@Override
public synchronized ChannelHandlerContext context(ChannelHandler handler) {
public ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
if (name2ctx.isEmpty()) {
return null;
}
DefaultChannelHandlerContext ctx = head;
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
if (ctx == null) {
break;
}
}
return null;
}
@Override
public synchronized ChannelHandlerContext context(
Class<? extends ChannelHandler> handlerType) {
public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
if (handlerType == null) {
throw new NullPointerException("handlerType");
}
if (name2ctx.isEmpty()) {
return null;
}
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
return ctx;
}
ctx = ctx.next;
if (ctx == null) {
break;
}
}
return null;
}
@Override
public List<String> names() {
List<String> list = new ArrayList<String>();
if (name2ctx.isEmpty()) {
return list;
}
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return list;
}
list.add(ctx.name());
ctx = ctx.next;
if (ctx == null) {
break;
}
}
return list;
}
@Override
public Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
if (name2ctx.isEmpty()) {
return map;
}
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return map;
}
map.put(ctx.name(), ctx.handler());
ctx = ctx.next;
if (ctx == null) {
break;
}
}
return map;
}
/**
@ -881,15 +879,21 @@ public class DefaultChannelPipeline implements ChannelPipeline {
buf.append('{');
DefaultChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
break;
}
buf.append('(');
buf.append(ctx.name());
buf.append(" = ");
buf.append(ctx.handler().getClass().getName());
buf.append(')');
ctx = ctx.next;
if (ctx == null) {
break;
}
buf.append(", ");
}
buf.append('}');
@ -898,19 +902,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public MessageBuf<Object> inboundMessageBuffer() {
if (channel.metadata().bufferType() != ChannelBufType.MESSAGE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a message buffer.");
}
return head.nextInboundMessageBuffer();
}
@Override
public ByteBuf inboundByteBuffer() {
if (channel.metadata().bufferType() != ChannelBufType.BYTE) {
throw new NoSuchBufferException(
"The first inbound buffer of this channel must be a byte buffer.");
}
return head.nextInboundByteBuffer();
}
@ -951,10 +947,21 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
ByteBuf nextOutboundByteBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
ByteBuf.class.getSimpleName()));
}
}
if (ctx.outByteBuf != null) {
@ -976,10 +983,21 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
MessageBuf<Object> nextOutboundMessageBuffer(DefaultChannelHandlerContext ctx) {
final DefaultChannelHandlerContext initialCtx = ctx;
final Thread currentThread = Thread.currentThread();
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s whose outbound buffer is %s.",
initialCtx.next.name(), ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s whose outbound buffer is %s.",
ChannelOutboundHandler.class.getSimpleName(),
MessageBuf.class.getSimpleName()));
}
}
if (ctx.outMsgBuf != null) {
@ -1288,11 +1306,25 @@ public class DefaultChannelPipeline implements ChannelPipeline {
}
validateFuture(future);
final DefaultChannelHandlerContext initialCtx = ctx;
EventExecutor executor;
boolean msgBuf = false;
for (;;) {
if (ctx == null) {
throw new NoSuchBufferException();
if (initialCtx.next != null) {
throw new NoSuchBufferException(String.format(
"the handler '%s' could not find a %s which accepts a %s, and " +
"the transport does not accept it as-is.",
initialCtx.next.name(),
ChannelOutboundHandler.class.getSimpleName(),
message.getClass().getSimpleName()));
} else {
throw new NoSuchBufferException(String.format(
"the pipeline does not contain a %s which accepts a %s, and " +
"the transport does not accept it as-is.",
ChannelOutboundHandler.class.getSimpleName(),
message.getClass().getSimpleName()));
}
}
if (ctx.hasOutboundMessageBuffer()) {

View File

@ -17,10 +17,15 @@ package io.netty.channel;
import java.util.concurrent.ThreadFactory;
/**
* Default {@link SingleThreadEventExecutor} implementation which just execute all submitted task in a
* serial fashion
*
*/
class DefaultEventExecutor extends SingleThreadEventExecutor {
DefaultEventExecutor(
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
DefaultEventExecutorGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -29,7 +29,7 @@ public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
return new DefaultEventExecutor(this, threadFactory, scheduler);
}
}

View File

@ -17,8 +17,36 @@ package io.netty.channel;
import java.util.concurrent.ScheduledExecutorService;
/**
* The {@link EventExecutor} is a special {@link ScheduledExecutorService} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop.
* Beside this it also extends the {@link EventExecutorGroup} to allow a generic way to
* access methods.
*
*/
public interface EventExecutor extends EventExecutorGroup, ScheduledExecutorService {
/**
* Returns a reference to itself.
*/
@Override
EventExecutor next();
/**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
* or <code>null</code> if it has no parent
*/
EventExecutorGroup parent();
/**
* Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
*/
boolean inEventLoop();
/**
* Return <code>true</code> if the given {@link Thread} is executed in the event loop,
* <code>false</code> otherwise.
*/
boolean inEventLoop(Thread thread);
}

View File

@ -18,6 +18,12 @@ package io.netty.channel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The {@link EventExecutorGroup} is responsible to provide {@link EventExecutor}'s to use via its
* {@link #next()} method. Beside this it also is responsible to handle their live-cycle and allows
* to shut them down in a global fashion.
*
*/
public interface EventExecutorGroup {
/**

View File

@ -15,10 +15,27 @@
*/
package io.netty.channel;
/**
* Special {@link EventExecutorGroup} which allows to register {@link Channel}'s that get
* processed for later selection during the event loop.
*
*/
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
*/
@Override
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(Channel channel, ChannelFuture future);
}

View File

@ -16,6 +16,9 @@
package io.netty.channel;
import java.nio.channels.Channels;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* The {@link CompleteChannelFuture} which is failed already. It is
@ -71,4 +74,15 @@ public class FailedChannelFuture extends CompleteChannelFuture {
throw new ChannelException(cause);
}
@Override
public Void get() throws InterruptedException, ExecutionException {
throw new ExecutionException(cause);
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
throw new ExecutionException(cause);
}
}

View File

@ -24,7 +24,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
private static final int DEFAULT_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final AtomicInteger poolId = new AtomicInteger();
final TaskScheduler scheduler;
final ChannelTaskScheduler scheduler;
private final EventExecutor[] children;
private final AtomicInteger childIndex = new AtomicInteger();
@ -41,7 +41,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
threadFactory = new DefaultThreadFactory();
}
scheduler = new TaskScheduler(threadFactory);
scheduler = new ChannelTaskScheduler(threadFactory);
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
@ -67,7 +67,7 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
}
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception;
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception;
@Override
public void shutdown() {

View File

@ -22,14 +22,9 @@ package io.netty.channel;
*/
public class NoSuchBufferException extends ChannelPipelineException {
private static final String DEFAULT_MESSAGE =
"Could not find a suitable destination buffer. Double-check if the pipeline is " +
"configured correctly and its handlers works as expected.";
private static final long serialVersionUID = -131650785896627090L;
public NoSuchBufferException() {
this(DEFAULT_MESSAGE);
}
public NoSuchBufferException(String message, Throwable cause) {

View File

@ -34,6 +34,10 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* Abstract base class for {@link EventExecutor}'s that execute all its submitted tasks in a single thread.
*
*/
public abstract class SingleThreadEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger =
@ -51,13 +55,13 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
private final TaskScheduler scheduler;
private final ChannelTaskScheduler scheduler;
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
EventExecutorGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

View File

@ -20,7 +20,7 @@ import java.util.concurrent.ThreadFactory;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected SingleThreadEventLoop(
EventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
EventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -16,6 +16,9 @@
package io.netty.channel;
import java.nio.channels.Channels;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* The {@link CompleteChannelFuture} which is succeeded already. It is
@ -52,4 +55,16 @@ public class SucceededChannelFuture extends CompleteChannelFuture {
public ChannelFuture syncUninterruptibly() {
return this;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}
}

View File

@ -15,7 +15,9 @@
*/
package io.netty.channel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class VoidChannelFuture implements ChannelFuture.Unsafe {
@ -120,6 +122,19 @@ public class VoidChannelFuture implements ChannelFuture.Unsafe {
return this;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
fail();
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
fail();
return null;
}
@Override
public boolean setProgress(long amount, long current, long total) {
return false;
@ -140,6 +155,11 @@ public class VoidChannelFuture implements ChannelFuture.Unsafe {
return false;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
private static void fail() {
throw new IllegalStateException("void future");
}

View File

@ -16,14 +16,14 @@
package io.netty.channel.local;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.TaskScheduler;
import io.netty.channel.ChannelTaskScheduler;
import java.util.concurrent.ThreadFactory;
final class LocalEventLoop extends SingleThreadEventLoop {
LocalEventLoop(
LocalEventLoopGroup parent, ThreadFactory threadFactory, TaskScheduler scheduler) {
LocalEventLoopGroup parent, ThreadFactory threadFactory, ChannelTaskScheduler scheduler) {
super(parent, threadFactory, scheduler);
}

View File

@ -17,7 +17,7 @@ package io.netty.channel.local;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.TaskScheduler;
import io.netty.channel.ChannelTaskScheduler;
import java.util.concurrent.ThreadFactory;
@ -37,7 +37,7 @@ public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, TaskScheduler scheduler, Object... args) throws Exception {
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {
return new LocalEventLoop(this, threadFactory, scheduler);
}
}

View File

@ -19,6 +19,9 @@ import static io.netty.channel.ChannelOption.*;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DetectionUtil;
import java.io.IOException;
import java.net.DatagramSocket;
@ -33,6 +36,8 @@ import java.util.Map;
*/
public class DefaultDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultDatagramChannelConfig.class);
private static final int DEFAULT_RECEIVE_PACKET_SIZE = 2048;
private final DatagramSocket socket;
@ -137,6 +142,19 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
@Override
public void setBroadcast(boolean broadcast) {
try {
// See: https://github.com/netty/netty/issues/576
if (broadcast &&
!DetectionUtil.isWindows() && !DetectionUtil.isRoot() &&
!socket.getLocalAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; setting the SO_BROADCAST flag " +
"anyway as requested on the socket which is bound to " +
socket.getLocalSocketAddress() + ".");
}
socket.setBroadcast(broadcast);
} catch (SocketException e) {
throw new ChannelException(e);

View File

@ -31,6 +31,7 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
implements SocketChannelConfig {
private final Socket socket;
private volatile boolean allowHalfClosure;
/**
* Creates a new instance.
@ -46,7 +47,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
public Map<ChannelOption<?>, Object> getOptions() {
return getOptions(
super.getOptions(),
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS);
SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
ALLOW_HALF_CLOSURE);
}
@Override
@ -72,6 +74,9 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
if (option == IP_TOS) {
return (T) Integer.valueOf(getTrafficClass());
}
if (option == ALLOW_HALF_CLOSURE) {
return (T) Boolean.valueOf(isAllowHalfClosure());
}
return super.getOption(option);
}
@ -94,6 +99,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else {
return super.setOption(option, value);
}
@ -236,4 +243,14 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
throw new ChannelException(e);
}
}
@Override
public boolean isAllowHalfClosure() {
return allowHalfClosure;
}
@Override
public void setAllowHalfClosure(boolean allowHalfClosure) {
this.allowHalfClosure = allowHalfClosure;
}
}

View File

@ -16,8 +16,10 @@
package io.netty.channel.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* A TCP/IP socket {@link Channel} which was either accepted by
@ -32,4 +34,21 @@ public interface SocketChannel extends Channel {
InetSocketAddress localAddress();
@Override
InetSocketAddress remoteAddress();
/**
* Returns {@code true} if and only if the remote peer shut down its output so that no more
* data is received from this channel. Note that the semantic of this method is different from
* that of {@link Socket#shutdownInput()} and {@link Socket#isInputShutdown()}.
*/
boolean isInputShutdown();
/**
* @see Socket#isOutputShutdown()
*/
boolean isOutputShutdown();
/**
* @see Socket#shutdownOutput()
*/
ChannelFuture shutdownOutput();
}

Some files were not shown because too many files have changed in this diff Show More