From 4e351f7399ffe737682b67d888bcbadfe0a868fa Mon Sep 17 00:00:00 2001 From: Jaen Saul Date: Mon, 20 Aug 2012 21:39:28 +0300 Subject: [PATCH 1/7] Do not write compressed SPDY frames out-of-band in another thread --- .../handler/codec/spdy/SpdyFrameEncoder.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java index d8bdd9b62c..e6ed2ff325 100644 --- a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -159,11 +159,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler { } // Writes of compressed data must occur in order final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); - e.getChannel().getPipeline().execute(new Runnable() { - public void run() { - Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); - } - }); + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); } return; @@ -197,11 +193,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler { } // Writes of compressed data must occur in order final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); - e.getChannel().getPipeline().execute(new Runnable() { - public void run() { - Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); - } - }); + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); } return; @@ -323,11 +315,7 @@ public class SpdyFrameEncoder implements ChannelDownstreamHandler { } // Writes of compressed data must occur in order final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); - e.getChannel().getPipeline().execute(new Runnable() { - public void run() { - Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); - } - }); + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); } return; From 5d07dea3b70c02852a864ceeeda82468986b95b2 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 26 Aug 2012 08:46:48 +0200 Subject: [PATCH 2/7] Use Selecor.select() to accept new Sockets to not need to schedule a timeout if not needed anyway. See #567 --- .../socket/nio/NioServerSocketPipelineSink.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 1e83d41418..b14d50162c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -226,11 +226,12 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { try { for (;;) { try { - if (selector.select(1000) > 0) { - // There was something selected if we reach this point, so clear - // the selected keys - selector.selectedKeys().clear(); - } + // Just do a blocking select without any timeout + // as this thread does not execute anything else. + selector.select(); + // There was something selected if we reach this point, so clear + // the selected keys + selector.selectedKeys().clear(); // accept connections in a for loop until no new connection is ready for (;;) { From 8b4f5933971311edb81be6e4f133390593901b31 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sun, 26 Aug 2012 09:02:59 +0200 Subject: [PATCH 3/7] Allow to adjust timeout of Selector.select(timeout) via org.jboss.netty.selectTimeout property. See #568 --- .../netty/channel/socket/nio/SelectorUtil.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java index bea75ce1bc..d639f45fbe 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/SelectorUtil.java @@ -21,12 +21,15 @@ import java.nio.channels.Selector; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.SystemPropertyUtil; final class SelectorUtil { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SelectorUtil.class); static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; + static final long DEFAULT_SELECT_TIMEOUT = 10; + static final long SELECT_TIMEOUT; // Workaround for JDK NIO bug. // @@ -45,11 +48,20 @@ final class SelectorUtil { logger.debug("Unable to get/set System Property '" + key + "'", e); } } + long selectTimeout; + try { + selectTimeout = Long.parseLong(SystemPropertyUtil.get("org.jboss.netty.selectTimeout", + String.valueOf(DEFAULT_SELECT_TIMEOUT))); + } catch (NumberFormatException e) { + selectTimeout = DEFAULT_SELECT_TIMEOUT; + } + SELECT_TIMEOUT = selectTimeout; + logger.debug("Using select timeout of " + SELECT_TIMEOUT); } static void select(Selector selector) throws IOException { try { - selector.select(10); + selector.select(SELECT_TIMEOUT); } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug( From e685e535f9dde1f90a0afddce07eb9d650eb20e6 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 28 Aug 2012 13:32:38 +0200 Subject: [PATCH 4/7] Introduce a new abstract class called OneToOneStrictEncoder which helps to ensure strict ordering. This should be used if that is needed like in the case of ZIP. See ##546 --- .../codec/compression/JdkZlibEncoder.java | 4 +- .../codec/compression/ZlibEncoder.java | 4 +- .../handler/codec/oneone/OneToOneEncoder.java | 9 ++++- .../codec/oneone/OneToOneStrictEncoder.java | 38 +++++++++++++++++++ 4 files changed, 50 insertions(+), 5 deletions(-) create mode 100644 src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneStrictEncoder.java diff --git a/src/main/java/org/jboss/netty/handler/codec/compression/JdkZlibEncoder.java b/src/main/java/org/jboss/netty/handler/codec/compression/JdkZlibEncoder.java index 59d78a67d3..61c370cb1f 100644 --- a/src/main/java/org/jboss/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/compression/JdkZlibEncoder.java @@ -29,7 +29,7 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.LifeCycleAwareChannelHandler; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder; /** @@ -37,7 +37,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; * @apiviz.landmark * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper */ -public class JdkZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler { +public class JdkZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler { private final byte[] out = new byte[8192]; private final Deflater deflater; diff --git a/src/main/java/org/jboss/netty/handler/codec/compression/ZlibEncoder.java b/src/main/java/org/jboss/netty/handler/codec/compression/ZlibEncoder.java index 8c38e9083a..c22dd99cb1 100644 --- a/src/main/java/org/jboss/netty/handler/codec/compression/ZlibEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/compression/ZlibEncoder.java @@ -27,7 +27,7 @@ import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.LifeCycleAwareChannelHandler; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import org.jboss.netty.handler.codec.oneone.OneToOneStrictEncoder; import org.jboss.netty.util.internal.jzlib.JZlib; import org.jboss.netty.util.internal.jzlib.ZStream; @@ -37,7 +37,7 @@ import org.jboss.netty.util.internal.jzlib.ZStream; * @apiviz.landmark * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper */ -public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler { +public class ZlibEncoder extends OneToOneStrictEncoder implements LifeCycleAwareChannelHandler { private static final byte[] EMPTY_ARRAY = new byte[0]; diff --git a/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneEncoder.java b/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneEncoder.java index 29c9d386eb..1dc29fa756 100644 --- a/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneEncoder.java @@ -57,13 +57,20 @@ public abstract class OneToOneEncoder implements ChannelDownstreamHandler { } MessageEvent e = (MessageEvent) evt; + if (!doEncode(ctx, e)) { + ctx.sendDownstream(e); + } + } + + protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object originalMessage = e.getMessage(); Object encodedMessage = encode(ctx, e.getChannel(), originalMessage); if (originalMessage == encodedMessage) { - ctx.sendDownstream(evt); + return false; } else if (encodedMessage != null) { write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress()); } + return true; } /** diff --git a/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneStrictEncoder.java b/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneStrictEncoder.java new file mode 100644 index 0000000000..3d21e0d569 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/oneone/OneToOneStrictEncoder.java @@ -0,0 +1,38 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.jboss.netty.handler.codec.oneone; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; + +/** + * Special {@link OneToOneEncoder} which enforce strict ordering of encoding and writing. This + * class should get extend by implementations that needs this enforcement to guaranteer no corruption. + * Basically all "message" based {@link OneToOneEncoder} mostly don't need this, where "stream" based + * are often in need of it. + * + */ +public abstract class OneToOneStrictEncoder extends OneToOneEncoder { + + @Override + protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + // Synchronize on the channel to guaranteer the strict ordering + synchronized (ctx.getChannel()) { + return super.doEncode(ctx, e); + } + } +} From a3cedc8b472c2f5c80f836627bc8da6439ee1fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?= Date: Tue, 28 Aug 2012 16:23:35 +0300 Subject: [PATCH 5/7] First part of fix for issue #569 When moving to take into account arrayOffset, it should have been taken into account also in setReadPosition and other places. Fix it now... --- .../codec/http/multipart/HttpPostBodyUtil.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java index c0d923c83e..12a4a0e5e1 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java @@ -134,6 +134,8 @@ final class HttpPostBodyUtil { int readerIndex; int pos; + + int origPos; int limit; @@ -148,7 +150,8 @@ final class HttpPostBodyUtil { } this.buffer = buffer; bytes = buffer.array(); - pos = readerIndex = buffer.arrayOffset() + buffer.readerIndex(); + readerIndex = buffer.readerIndex(); + origPos = pos = buffer.arrayOffset() + readerIndex; limit = buffer.arrayOffset() + buffer.writerIndex(); } @@ -159,10 +162,19 @@ final class HttpPostBodyUtil { */ void setReadPosition(int minus) { pos -= minus; - readerIndex = pos; + readerIndex = getReadPosition(pos); buffer.readerIndex(readerIndex); } + /** + * + * @param index raw index of the array (pos in general) + * @return the value equivalent of raw index to be used in readerIndex(value) + */ + int getReadPosition(int index) { + return index - origPos + readerIndex; + } + void clear() { buffer = null; bytes = null; From 148fc841ac8044e8fcf5f365c0a007d6a241e0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Br=C3=A9gier?= Date: Tue, 28 Aug 2012 16:39:47 +0300 Subject: [PATCH 6/7] Second part of fix for issue #569 When moving to take into account arrayOffset, it should have been taken into account also in setReadPosition and other places. Fix it now... Also fix mismatch algorithm between SeekAheadOptimize and not SeekAheadOptimize (standard) versions. --- .../multipart/HttpPostRequestDecoder.java | 92 +++++++------------ 1 file changed, 34 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java index 3b674280fe..0cf9837b23 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java @@ -849,13 +849,18 @@ public class HttpPostRequestDecoder { /** * Skip control Characters + * @throws NotEnoughDataDecoderException */ - void skipControlCharacters() { + void skipControlCharacters() throws NotEnoughDataDecoderException { SeekAheadOptimize sao = null; try { sao = new SeekAheadOptimize(undecodedChunk); } catch (SeekAheadNoBackArrayException e) { - skipControlCharactersStandard(undecodedChunk); + try { + skipControlCharactersStandard(undecodedChunk); + } catch (IndexOutOfBoundsException e1) { + throw new NotEnoughDataDecoderException(e1); + } return; } @@ -866,13 +871,13 @@ public class HttpPostRequestDecoder { return; } } - sao.setReadPosition(0); + throw new NotEnoughDataDecoderException("Access out of bounds"); } - static void skipControlCharactersStandard(ChannelBuffer buffer) { + void skipControlCharactersStandard() { for (;;) { - char c = (char) buffer.readUnsignedByte(); + char c = (char) undecodedChunk.readUnsignedByte(); if (!Character.isISOControl(c) && !Character.isWhitespace(c)) { - buffer.readerIndex(buffer.readerIndex() - 1); + undecodedChunk.readerIndex(undecodedChunk.readerIndex() - 1); break; } } @@ -892,7 +897,12 @@ public class HttpPostRequestDecoder { throws ErrorDataDecoderException { // --AaB03x or --AaB03x-- int readerIndex = undecodedChunk.readerIndex(); - skipControlCharacters(); + try { + skipControlCharacters(); + } catch (NotEnoughDataDecoderException e1) { + undecodedChunk.readerIndex(readerIndex); + return null; + } skipOneLine(); String newline; try { @@ -933,9 +943,9 @@ public class HttpPostRequestDecoder { } // read many lines until empty line with newline found! Store all data while (!skipOneLine()) { - skipControlCharacters(); String newline; try { + skipControlCharacters(); newline = readLine(); } catch (NotEnoughDataDecoderException e) { undecodedChunk.readerIndex(readerIndex); @@ -1594,8 +1604,8 @@ public class HttpPostRequestDecoder { // found the decoder limit boolean newLine = true; int index = 0; + int lastrealpos = sao.pos; int lastPosition = undecodedChunk.readerIndex(); - int setReadPosition = -1; boolean found = false; while (sao.pos < sao.limit) { @@ -1606,7 +1616,6 @@ public class HttpPostRequestDecoder { index ++; if (delimiter.length() == index) { found = true; - sao.setReadPosition(0); break; } continue; @@ -1620,23 +1629,16 @@ public class HttpPostRequestDecoder { if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - setReadPosition = sao.pos; - lastPosition = sao.pos - 2; + lastrealpos = sao.pos - 2; } - } else { - // save last valid position - setReadPosition = sao.pos; - lastPosition = sao.pos; } } else if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - setReadPosition = sao.pos; - lastPosition = sao.pos - 1; + lastrealpos = sao.pos - 1; } else { // save last valid position - setReadPosition = sao.pos; - lastPosition = sao.pos; + lastrealpos = sao.pos; } } } else { @@ -1647,30 +1649,20 @@ public class HttpPostRequestDecoder { if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - setReadPosition = sao.pos; - lastPosition = sao.pos - 2; + lastrealpos = sao.pos - 2; } - } else { - // save last valid position - setReadPosition = sao.pos; - lastPosition = sao.pos; } } else if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - setReadPosition = sao.pos; - lastPosition = sao.pos - 1; + lastrealpos = sao.pos - 1; } else { // save last valid position - setReadPosition = sao.pos; - lastPosition = sao.pos; + lastrealpos = sao.pos; } } } - if (setReadPosition > 0) { - sao.pos = setReadPosition; - sao.setReadPosition(0); - } + lastPosition = sao.getReadPosition(lastrealpos); ChannelBuffer buffer = undecodedChunk.slice(readerIndex, lastPosition - readerIndex); if (found) { // found so lastPosition is correct and final @@ -1809,7 +1801,7 @@ public class HttpPostRequestDecoder { boolean newLine = true; int index = 0; int lastPosition = undecodedChunk.readerIndex(); - int setReadPosition = -1; + int lastrealpos = sao.pos; boolean found = false; while (sao.pos < sao.limit) { @@ -1820,7 +1812,6 @@ public class HttpPostRequestDecoder { index ++; if (delimiter.length() == index) { found = true; - sao.setReadPosition(0); break; } continue; @@ -1834,21 +1825,15 @@ public class HttpPostRequestDecoder { if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - lastPosition = sao.pos - 2; - setReadPosition = sao.pos; + lastrealpos = sao.pos - 2; } - } else { - lastPosition = sao.pos; - setReadPosition = sao.pos; } } else if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - lastPosition = sao.pos - 1; - setReadPosition = sao.pos; + lastrealpos = sao.pos - 1; } else { - lastPosition = sao.pos; - setReadPosition = sao.pos; + lastrealpos = sao.pos; } } } else { @@ -1859,28 +1844,19 @@ public class HttpPostRequestDecoder { if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - lastPosition = sao.pos - 2; - setReadPosition = sao.pos; + lastrealpos = sao.pos - 2; } - } else { - lastPosition = sao.pos; - setReadPosition = sao.pos; } } else if (nextByte == HttpConstants.LF) { newLine = true; index = 0; - lastPosition = sao.pos - 1; - setReadPosition = sao.pos; + lastrealpos = sao.pos - 1; } else { - lastPosition = sao.pos; - setReadPosition = sao.pos; + lastrealpos = sao.pos; } } } - if (setReadPosition > 0) { - sao.pos = setReadPosition; - sao.setReadPosition(0); - } + lastPosition = sao.getReadPosition(lastrealpos); if (found) { // found so lastPosition is correct // but position is just after the delimiter (either close delimiter or simple one) From 6bd6be0b09c3f53969a4f6f53041d32822ce6703 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 28 Aug 2012 18:19:31 +0200 Subject: [PATCH 7/7] Fix checkstyle and broken method call. Part of #569 and #572 --- .../netty/handler/codec/http/multipart/HttpPostBodyUtil.java | 5 ----- .../handler/codec/http/multipart/HttpPostRequestDecoder.java | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java index 12a4a0e5e1..4206a430aa 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostBodyUtil.java @@ -130,15 +130,10 @@ final class HttpPostBodyUtil { */ static class SeekAheadOptimize { byte[] bytes; - int readerIndex; - int pos; - int origPos; - int limit; - ChannelBuffer buffer; /** diff --git a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java index 0cf9837b23..79f71017dd 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/multipart/HttpPostRequestDecoder.java @@ -857,7 +857,7 @@ public class HttpPostRequestDecoder { sao = new SeekAheadOptimize(undecodedChunk); } catch (SeekAheadNoBackArrayException e) { try { - skipControlCharactersStandard(undecodedChunk); + skipControlCharactersStandard(); } catch (IndexOutOfBoundsException e1) { throw new NotEnoughDataDecoderException(e1); }