Merge branch '3' into jdk_epoll_bug_workaround

Conflicts:
	src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java
This commit is contained in:
Norman Maurer 2012-08-29 07:48:59 +02:00
commit eddca7583e
9 changed files with 120 additions and 92 deletions

View File

@ -226,11 +226,12 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
try { try {
for (;;) { for (;;) {
try { try {
if (selector.select(1000) > 0) { // Just do a blocking select without any timeout
// There was something selected if we reach this point, so clear // as this thread does not execute anything else.
// the selected keys selector.select();
selector.selectedKeys().clear(); // There was something selected if we reach this point, so clear
} // the selected keys
selector.selectedKeys().clear();
// accept connections in a for loop until no new connection is ready // accept connections in a for loop until no new connection is ready
for (;;) { for (;;) {

View File

@ -22,15 +22,16 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.SystemPropertyUtil;
final class SelectorUtil { final class SelectorUtil {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SelectorUtil.class); InternalLoggerFactory.getInstance(SelectorUtil.class);
static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
static final long DEFAULT_SELECT_TIMEOUT = 10;
static final long SELECT_TIMEOUT = 10; static final long SELECT_TIMEOUT;
static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); static final long SELECT_TIMEOUT_NANOS;
// Workaround for JDK NIO bug. // Workaround for JDK NIO bug.
// //
@ -49,6 +50,16 @@ final class SelectorUtil {
logger.debug("Unable to get/set System Property '" + key + "'", e); logger.debug("Unable to get/set System Property '" + key + "'", e);
} }
} }
long selectTimeout;
try {
selectTimeout = Long.parseLong(SystemPropertyUtil.get("org.jboss.netty.selectTimeout",
String.valueOf(DEFAULT_SELECT_TIMEOUT)));
} catch (NumberFormatException e) {
selectTimeout = DEFAULT_SELECT_TIMEOUT;
}
SELECT_TIMEOUT = selectTimeout;
SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toMicros(SELECT_TIMEOUT);
logger.debug("Using select timeout of " + SELECT_TIMEOUT);
} }
static int select(Selector selector) throws IOException { static int select(Selector selector) throws IOException {

View File

@ -29,7 +29,7 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
/** /**
@ -37,7 +37,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
* @apiviz.landmark * @apiviz.landmark
* @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
*/ */
public class JdkZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler { public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
private final byte[] out = new byte[8192]; private final byte[] out = new byte[8192];
private final Deflater deflater; private final Deflater deflater;

View File

@ -27,7 +27,7 @@ import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder;
import org.jboss.netty.util.internal.jzlib.JZlib; import org.jboss.netty.util.internal.jzlib.JZlib;
import org.jboss.netty.util.internal.jzlib.ZStream; import org.jboss.netty.util.internal.jzlib.ZStream;
@ -37,7 +37,7 @@ import org.jboss.netty.util.internal.jzlib.ZStream;
* @apiviz.landmark * @apiviz.landmark
* @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
*/ */
public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler { public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler {
private static final byte[] EMPTY_ARRAY = new byte[0]; private static final byte[] EMPTY_ARRAY = new byte[0];

View File

@ -130,13 +130,10 @@ final class HttpPostBodyUtil {
*/ */
static class SeekAheadOptimize { static class SeekAheadOptimize {
byte[] bytes; byte[] bytes;
int readerIndex; int readerIndex;
int pos; int pos;
int origPos;
int limit; int limit;
ChannelBuffer buffer; ChannelBuffer buffer;
/** /**
@ -148,7 +145,8 @@ final class HttpPostBodyUtil {
} }
this.buffer = buffer; this.buffer = buffer;
bytes = buffer.array(); bytes = buffer.array();
pos = readerIndex = buffer.arrayOffset() + buffer.readerIndex(); readerIndex = buffer.readerIndex();
origPos = pos = buffer.arrayOffset() + readerIndex;
limit = buffer.arrayOffset() + buffer.writerIndex(); limit = buffer.arrayOffset() + buffer.writerIndex();
} }
@ -159,10 +157,19 @@ final class HttpPostBodyUtil {
*/ */
void setReadPosition(int minus) { void setReadPosition(int minus) {
pos -= minus; pos -= minus;
readerIndex = pos; readerIndex = getReadPosition(pos);
buffer.readerIndex(readerIndex); buffer.readerIndex(readerIndex);
} }
/**
*
* @param index raw index of the array (pos in general)
* @return the value equivalent of raw index to be used in readerIndex(value)
*/
int getReadPosition(int index) {
return index - origPos + readerIndex;
}
void clear() { void clear() {
buffer = null; buffer = null;
bytes = null; bytes = null;

View File

@ -849,13 +849,18 @@ public class HttpPostRequestDecoder {
/** /**
* Skip control Characters * Skip control Characters
* @throws NotEnoughDataDecoderException
*/ */
void skipControlCharacters() { void skipControlCharacters() throws NotEnoughDataDecoderException {
SeekAheadOptimize sao = null; SeekAheadOptimize sao = null;
try { try {
sao = new SeekAheadOptimize(undecodedChunk); sao = new SeekAheadOptimize(undecodedChunk);
} catch (SeekAheadNoBackArrayException e) { } catch (SeekAheadNoBackArrayException e) {
skipControlCharactersStandard(undecodedChunk); try {
skipControlCharactersStandard();
} catch (IndexOutOfBoundsException e1) {
throw new NotEnoughDataDecoderException(e1);
}
return; return;
} }
@ -866,13 +871,13 @@ public class HttpPostRequestDecoder {
return; return;
} }
} }
sao.setReadPosition(0); throw new NotEnoughDataDecoderException("Access out of bounds");
} }
static void skipControlCharactersStandard(ChannelBuffer buffer) { void skipControlCharactersStandard() {
for (;;) { for (;;) {
char c = (char) buffer.readUnsignedByte(); char c = (char) undecodedChunk.readUnsignedByte();
if (!Character.isISOControl(c) && !Character.isWhitespace(c)) { if (!Character.isISOControl(c) && !Character.isWhitespace(c)) {
buffer.readerIndex(buffer.readerIndex() - 1); undecodedChunk.readerIndex(undecodedChunk.readerIndex() - 1);
break; break;
} }
} }
@ -892,7 +897,12 @@ public class HttpPostRequestDecoder {
throws ErrorDataDecoderException { throws ErrorDataDecoderException {
// --AaB03x or --AaB03x-- // --AaB03x or --AaB03x--
int readerIndex = undecodedChunk.readerIndex(); int readerIndex = undecodedChunk.readerIndex();
skipControlCharacters(); try {
skipControlCharacters();
} catch (NotEnoughDataDecoderException e1) {
undecodedChunk.readerIndex(readerIndex);
return null;
}
skipOneLine(); skipOneLine();
String newline; String newline;
try { try {
@ -933,9 +943,9 @@ public class HttpPostRequestDecoder {
} }
// read many lines until empty line with newline found! Store all data // read many lines until empty line with newline found! Store all data
while (!skipOneLine()) { while (!skipOneLine()) {
skipControlCharacters();
String newline; String newline;
try { try {
skipControlCharacters();
newline = readLine(); newline = readLine();
} catch (NotEnoughDataDecoderException e) { } catch (NotEnoughDataDecoderException e) {
undecodedChunk.readerIndex(readerIndex); undecodedChunk.readerIndex(readerIndex);
@ -1594,8 +1604,8 @@ public class HttpPostRequestDecoder {
// found the decoder limit // found the decoder limit
boolean newLine = true; boolean newLine = true;
int index = 0; int index = 0;
int lastrealpos = sao.pos;
int lastPosition = undecodedChunk.readerIndex(); int lastPosition = undecodedChunk.readerIndex();
int setReadPosition = -1;
boolean found = false; boolean found = false;
while (sao.pos < sao.limit) { while (sao.pos < sao.limit) {
@ -1606,7 +1616,6 @@ public class HttpPostRequestDecoder {
index ++; index ++;
if (delimiter.length() == index) { if (delimiter.length() == index) {
found = true; found = true;
sao.setReadPosition(0);
break; break;
} }
continue; continue;
@ -1620,23 +1629,16 @@ public class HttpPostRequestDecoder {
if (nextByte == HttpConstants.LF) { if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
setReadPosition = sao.pos; lastrealpos = sao.pos - 2;
lastPosition = sao.pos - 2;
} }
} else {
// save last valid position
setReadPosition = sao.pos;
lastPosition = sao.pos;
} }
} else if (nextByte == HttpConstants.LF) { } else if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
setReadPosition = sao.pos; lastrealpos = sao.pos - 1;
lastPosition = sao.pos - 1;
} else { } else {
// save last valid position // save last valid position
setReadPosition = sao.pos; lastrealpos = sao.pos;
lastPosition = sao.pos;
} }
} }
} else { } else {
@ -1647,30 +1649,20 @@ public class HttpPostRequestDecoder {
if (nextByte == HttpConstants.LF) { if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
setReadPosition = sao.pos; lastrealpos = sao.pos - 2;
lastPosition = sao.pos - 2;
} }
} else {
// save last valid position
setReadPosition = sao.pos;
lastPosition = sao.pos;
} }
} else if (nextByte == HttpConstants.LF) { } else if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
setReadPosition = sao.pos; lastrealpos = sao.pos - 1;
lastPosition = sao.pos - 1;
} else { } else {
// save last valid position // save last valid position
setReadPosition = sao.pos; lastrealpos = sao.pos;
lastPosition = sao.pos;
} }
} }
} }
if (setReadPosition > 0) { lastPosition = sao.getReadPosition(lastrealpos);
sao.pos = setReadPosition;
sao.setReadPosition(0);
}
ChannelBuffer buffer = undecodedChunk.slice(readerIndex, lastPosition - readerIndex); ChannelBuffer buffer = undecodedChunk.slice(readerIndex, lastPosition - readerIndex);
if (found) { if (found) {
// found so lastPosition is correct and final // found so lastPosition is correct and final
@ -1809,7 +1801,7 @@ public class HttpPostRequestDecoder {
boolean newLine = true; boolean newLine = true;
int index = 0; int index = 0;
int lastPosition = undecodedChunk.readerIndex(); int lastPosition = undecodedChunk.readerIndex();
int setReadPosition = -1; int lastrealpos = sao.pos;
boolean found = false; boolean found = false;
while (sao.pos < sao.limit) { while (sao.pos < sao.limit) {
@ -1820,7 +1812,6 @@ public class HttpPostRequestDecoder {
index ++; index ++;
if (delimiter.length() == index) { if (delimiter.length() == index) {
found = true; found = true;
sao.setReadPosition(0);
break; break;
} }
continue; continue;
@ -1834,21 +1825,15 @@ public class HttpPostRequestDecoder {
if (nextByte == HttpConstants.LF) { if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
lastPosition = sao.pos - 2; lastrealpos = sao.pos - 2;
setReadPosition = sao.pos;
} }
} else {
lastPosition = sao.pos;
setReadPosition = sao.pos;
} }
} else if (nextByte == HttpConstants.LF) { } else if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
lastPosition = sao.pos - 1; lastrealpos = sao.pos - 1;
setReadPosition = sao.pos;
} else { } else {
lastPosition = sao.pos; lastrealpos = sao.pos;
setReadPosition = sao.pos;
} }
} }
} else { } else {
@ -1859,28 +1844,19 @@ public class HttpPostRequestDecoder {
if (nextByte == HttpConstants.LF) { if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
lastPosition = sao.pos - 2; lastrealpos = sao.pos - 2;
setReadPosition = sao.pos;
} }
} else {
lastPosition = sao.pos;
setReadPosition = sao.pos;
} }
} else if (nextByte == HttpConstants.LF) { } else if (nextByte == HttpConstants.LF) {
newLine = true; newLine = true;
index = 0; index = 0;
lastPosition = sao.pos - 1; lastrealpos = sao.pos - 1;
setReadPosition = sao.pos;
} else { } else {
lastPosition = sao.pos; lastrealpos = sao.pos;
setReadPosition = sao.pos;
} }
} }
} }
if (setReadPosition > 0) { lastPosition = sao.getReadPosition(lastrealpos);
sao.pos = setReadPosition;
sao.setReadPosition(0);
}
if (found) { if (found) {
// found so lastPosition is correct // found so lastPosition is correct
// but position is just after the delimiter (either close delimiter or simple one) // but position is just after the delimiter (either close delimiter or simple one)

View File

@ -57,13 +57,20 @@ public abstract class OneToOneEncoder implements ChannelDownstreamHandler {
} }
MessageEvent e = (MessageEvent) evt; MessageEvent e = (MessageEvent) evt;
if (!doEncode(ctx, e)) {
ctx.sendDownstream(e);
}
}
protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object originalMessage = e.getMessage(); Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage); Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) { if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt); return false;
} else if (encodedMessage != null) { } else if (encodedMessage != null) {
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress()); write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
} }
return true;
} }
/** /**

View File

@ -0,0 +1,38 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.handler.codec.oneone;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
/**
* Special {@link OneToOneEncoder} which enforce strict ordering of encoding and writing. This
* class should get extend by implementations that needs this enforcement to guaranteer no corruption.
* Basically all "message" based {@link OneToOneEncoder} mostly don't need this, where "stream" based
* are often in need of it.
*
*/
public abstract class OneToOneStrictEncoder extends OneToOneEncoder {
@Override
protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// Synchronize on the channel to guaranteer the strict ordering
synchronized (ctx.getChannel()) {
return super.doEncode(ctx, e);
}
}
}

View File

@ -159,11 +159,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler {
} }
// Writes of compressed data must occur in order // Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() { Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
return; return;
@ -197,11 +193,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler {
} }
// Writes of compressed data must occur in order // Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() { Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
return; return;
@ -323,11 +315,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler {
} }
// Writes of compressed data must occur in order // Writes of compressed data must occur in order
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data);
e.getChannel().getPipeline().execute(new Runnable() { Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
public void run() {
Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress());
}
});
} }
return; return;