Merge remote branch 'upstream/master'

This commit is contained in:
Jestan Nirojan 2012-03-05 20:56:45 +05:30
commit c40de99083
125 changed files with 3544 additions and 3202 deletions

View File

@ -821,6 +821,81 @@ public final class ChannelBuffers {
return new ReadOnlyChannelBuffer(buffer); return new ReadOnlyChannelBuffer(buffer);
} }
/**
* Create a {@link ChannelBuffer} that holds all the given values as int's
*
*/
public static ChannelBuffer wrapInt(int... values) {
if (values == null || values.length == 0) {
return EMPTY_BUFFER;
}
ChannelBuffer buffer = buffer(values.length * 4);
for (int v: values) {
buffer.writeInt(v);
}
return buffer;
}
/**
* Create a {@link ChannelBuffer} that holds all the given values as short's
*
*/
public static ChannelBuffer wrapShort(int... values) {
if (values == null || values.length == 0) {
return EMPTY_BUFFER;
}
ChannelBuffer buffer = buffer(values.length * 2);
for (int v: values) {
buffer.writeShort(v);
}
return buffer;
}
/**
* Create a {@link ChannelBuffer} that holds all the given values as medium's
*
*/
public static ChannelBuffer wrapMedium(int... values) {
if (values == null || values.length == 0) {
return EMPTY_BUFFER;
}
ChannelBuffer buffer = buffer(values.length * 3);
for (int v: values) {
buffer.writeMedium(v);
}
return buffer;
}
/**
* Create a {@link ChannelBuffer} that holds all the given values as long's
*
*/
public static ChannelBuffer wrapLong(long... values) {
if (values == null || values.length == 0) {
return EMPTY_BUFFER;
}
ChannelBuffer buffer = buffer(values.length * 8);
for (long v: values) {
buffer.writeLong(v);
}
return buffer;
}
/**
* Create a {@link ChannelBuffer} that holds all the given values as boolean's
*
*/
public static ChannelBuffer wrapBoolean(boolean... values) {
if (values == null || values.length == 0) {
return EMPTY_BUFFER;
}
ChannelBuffer buffer = buffer(values.length);
for (boolean v: values) {
buffer.writeBoolean(v);
}
return buffer;
}
/** /**
* Returns a <a href="http://en.wikipedia.org/wiki/Hex_dump">hex dump</a> * Returns a <a href="http://en.wikipedia.org/wiki/Hex_dump">hex dump</a>
* of the specified buffer's readable bytes. * of the specified buffer's readable bytes.

View File

@ -425,4 +425,70 @@ public class ChannelBuffersTest {
// Expected // Expected
} }
} }
@Test
public void testWrapInt() {
ChannelBuffer buffer = ChannelBuffers.wrapInt(1,4);
assertEquals(8, buffer.capacity());
assertEquals(1, buffer.readInt());
assertEquals(4, buffer.readInt());
assertFalse(buffer.readable());
assertEquals(0, ChannelBuffers.wrapInt(null).capacity());
assertEquals(0, ChannelBuffers.wrapInt(new int[0]).capacity());
}
@Test
public void testWrapShort() {
ChannelBuffer buffer = ChannelBuffers.wrapShort(1,4);
assertEquals(4, buffer.capacity());
assertEquals(1, buffer.readShort());
assertEquals(4, buffer.readShort());
assertFalse(buffer.readable());
assertEquals(0, ChannelBuffers.wrapShort(null).capacity());
assertEquals(0, ChannelBuffers.wrapShort(new int[0]).capacity());
}
@Test
public void testWrapMedium() {
ChannelBuffer buffer = ChannelBuffers.wrapMedium(1,4);
assertEquals(6, buffer.capacity());
assertEquals(1, buffer.readMedium());
assertEquals(4, buffer.readMedium());
assertFalse(buffer.readable());
assertEquals(0, ChannelBuffers.wrapMedium(null).capacity());
assertEquals(0, ChannelBuffers.wrapMedium(new int[0]).capacity());
}
@Test
public void testWrapLong() {
ChannelBuffer buffer = ChannelBuffers.wrapLong(1,4);
assertEquals(16, buffer.capacity());
assertEquals(1, buffer.readLong());
assertEquals(4, buffer.readLong());
assertFalse(buffer.readable());
assertEquals(0, ChannelBuffers.wrapLong(null).capacity());
assertEquals(0, ChannelBuffers.wrapLong(new long[0]).capacity());
}
@Test
public void testWrapBoolean() {
ChannelBuffer buffer = ChannelBuffers.wrapBoolean(true, false);
assertEquals(2, buffer.capacity());
assertEquals(true, buffer.readBoolean());
assertEquals(false, buffer.readBoolean());
assertFalse(buffer.readable());
assertEquals(0, ChannelBuffers.wrapBoolean(null).capacity());
assertEquals(0, ChannelBuffers.wrapBoolean(new boolean[0]).capacity());
}
} }

View File

@ -16,17 +16,11 @@
package io.netty.handler.codec.http.websocketx; package io.netty.handler.codec.http.websocketx;
import java.net.URI; import java.net.URI;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Map; import java.util.Map;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.CharsetUtil;
/** /**
* Base class for web socket client handshake implementations * Base class for web socket client handshake implementations
@ -111,7 +105,7 @@ public abstract class WebSocketClientHandshaker {
/** /**
* Begins the opening handshake * Begins the opening handshake
* *
* @param channel * @param channel
* Channel * Channel
*/ */
@ -119,7 +113,7 @@ public abstract class WebSocketClientHandshaker {
/** /**
* Validates and finishes the opening handshake initiated by {@link #handshake}}. * Validates and finishes the opening handshake initiated by {@link #handshake}}.
* *
* @param channel * @param channel
* Channel * Channel
* @param response * @param response

View File

@ -15,18 +15,12 @@
*/ */
package io.netty.handler.codec.http.websocketx; package io.netty.handler.codec.http.websocketx;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.CharsetUtil;
/** /**
* Base class for server side web socket opening and closing handshakes * Base class for server side web socket opening and closing handshakes
@ -41,7 +35,7 @@ public abstract class WebSocketServerHandshaker {
/** /**
* Constructor specifying the destination web socket location * Constructor specifying the destination web socket location
* *
* @param version * @param version
* the protocol version * the protocol version
* @param webSocketUrl * @param webSocketUrl
@ -92,7 +86,7 @@ public abstract class WebSocketServerHandshaker {
/** /**
* Performs the opening handshake * Performs the opening handshake
* *
* @param channel * @param channel
* Channel * Channel
* @param req * @param req
@ -102,7 +96,7 @@ public abstract class WebSocketServerHandshaker {
/** /**
* Performs the closing handshake * Performs the closing handshake
* *
* @param channel * @param channel
* Channel * Channel
* @param frame * @param frame
@ -112,7 +106,7 @@ public abstract class WebSocketServerHandshaker {
/** /**
* Selects the first matching supported sub protocol * Selects the first matching supported sub protocol
* *
* @param requestedSubprotocols * @param requestedSubprotocols
* CSV of protocols to be supported. e.g. "chat, superchat" * CSV of protocols to be supported. e.g. "chat, superchat"
* @return First matching supported sub protocol. Null if not found. * @return First matching supported sub protocol. Null if not found.

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.List; import java.util.List;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.Map; import java.util.Map;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.ChannelDownstreamHandler;

View File

@ -13,28 +13,12 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.compression.ZlibDecoder;
import io.netty.handler.codec.embedder.DecoderEmbedder;
import io.netty.handler.codec.frame.FrameDecoder; import io.netty.handler.codec.frame.FrameDecoder;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*; import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
@ -48,8 +32,8 @@ public class SpdyFrameDecoder extends FrameDecoder {
private final int maxFrameSize; private final int maxFrameSize;
private final int maxHeaderSize; private final int maxHeaderSize;
private final DecoderEmbedder<ChannelBuffer> headerBlockDecompressor = private final SpdyHeaderBlockDecompressor headerBlockDecompressor =
new DecoderEmbedder<ChannelBuffer>(new ZlibDecoder(SPDY_DICT)); SpdyHeaderBlockDecompressor.newInstance();
/** /**
* Creates a new instance with the default {@code maxChunkSize (8192)}, * Creates a new instance with the default {@code maxChunkSize (8192)},
@ -129,7 +113,9 @@ public class SpdyFrameDecoder extends FrameDecoder {
int type = getUnsignedShort(buffer, typeOffset); int type = getUnsignedShort(buffer, typeOffset);
buffer.skipBytes(SPDY_HEADER_SIZE); buffer.skipBytes(SPDY_HEADER_SIZE);
return decodeControlFrame(type, flags, buffer.readBytes(dataLength)); int readerIndex = buffer.readerIndex();
buffer.skipBytes(dataLength);
return decodeControlFrame(type, flags, buffer.slice(readerIndex, dataLength));
} else { } else {
// Decode data frame common header // Decode data frame common header
int streamID = getUnsignedInt(buffer, frameOffset); int streamID = getUnsignedInt(buffer, frameOffset);
@ -181,7 +167,7 @@ public class SpdyFrameDecoder extends FrameDecoder {
spdySynStreamFrame.setLast(last); spdySynStreamFrame.setLast(last);
spdySynStreamFrame.setUnidirectional(unid); spdySynStreamFrame.setUnidirectional(unid);
decodeHeaderBlock(spdySynStreamFrame, decompress(data)); decodeHeaderBlock(spdySynStreamFrame, data);
return spdySynStreamFrame; return spdySynStreamFrame;
@ -199,7 +185,7 @@ public class SpdyFrameDecoder extends FrameDecoder {
last = (flags & SPDY_FLAG_FIN) != 0; last = (flags & SPDY_FLAG_FIN) != 0;
spdySynReplyFrame.setLast(last); spdySynReplyFrame.setLast(last);
decodeHeaderBlock(spdySynReplyFrame, decompress(data)); decodeHeaderBlock(spdySynReplyFrame, data);
return spdySynReplyFrame; return spdySynReplyFrame;
@ -299,7 +285,7 @@ public class SpdyFrameDecoder extends FrameDecoder {
SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamID); SpdyHeadersFrame spdyHeadersFrame = new DefaultSpdyHeadersFrame(streamID);
decodeHeaderBlock(spdyHeadersFrame, decompress(data)); decodeHeaderBlock(spdyHeadersFrame, data);
return spdyHeadersFrame; return spdyHeadersFrame;
@ -311,31 +297,38 @@ public class SpdyFrameDecoder extends FrameDecoder {
} }
} }
private ChannelBuffer decompress(ChannelBuffer compressed) throws Exception { private boolean ensureBytes(ChannelBuffer decompressed, int bytes) throws Exception {
if ((compressed.readableBytes() == 2) && if (decompressed.readableBytes() >= bytes) {
(compressed.getShort(compressed.readerIndex()) == 0)) { return true;
return compressed;
} }
headerBlockDecompressor.offer(compressed); decompressed.discardReadBytes();
return headerBlockDecompressor.poll(); headerBlockDecompressor.decode(decompressed);
return decompressed.readableBytes() >= bytes;
} }
private void decodeHeaderBlock(SpdyHeaderBlock headerFrame, ChannelBuffer headerBlock) private void decodeHeaderBlock(SpdyHeaderBlock headerFrame, ChannelBuffer headerBlock)
throws Exception { throws Exception {
if (headerBlock.readableBytes() < 2) { if ((headerBlock.readableBytes() == 2) &&
(headerBlock.getShort(headerBlock.readerIndex()) == 0)) {
return;
}
headerBlockDecompressor.setInput(headerBlock);
ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(8192);
headerBlockDecompressor.decode(decompressed);
if (decompressed.readableBytes() < 2) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received invalid header block"); "Received invalid header block");
} }
int headerSize = 0; int headerSize = 0;
int numEntries = getUnsignedShort(headerBlock, headerBlock.readerIndex()); int numEntries = decompressed.readUnsignedShort();
headerBlock.skipBytes(2);
for (int i = 0; i < numEntries; i ++) { for (int i = 0; i < numEntries; i ++) {
if (headerBlock.readableBytes() < 2) { if (!ensureBytes(decompressed, 2)) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received invalid header block"); "Received invalid header block");
} }
int nameLength = getUnsignedShort(headerBlock, headerBlock.readerIndex()); int nameLength = decompressed.readUnsignedShort();
headerBlock.skipBytes(2);
if (nameLength == 0) { if (nameLength == 0) {
headerFrame.setInvalid(); headerFrame.setInvalid();
return; return;
@ -345,23 +338,22 @@ public class SpdyFrameDecoder extends FrameDecoder {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Header block exceeds " + maxHeaderSize); "Header block exceeds " + maxHeaderSize);
} }
if (headerBlock.readableBytes() < nameLength) { if (!ensureBytes(decompressed, nameLength)) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received invalid header block"); "Received invalid header block");
} }
byte[] nameBytes = new byte[nameLength]; byte[] nameBytes = new byte[nameLength];
headerBlock.readBytes(nameBytes); decompressed.readBytes(nameBytes);
String name = new String(nameBytes, "UTF-8"); String name = new String(nameBytes, "UTF-8");
if (headerFrame.containsHeader(name)) { if (headerFrame.containsHeader(name)) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received duplicate header name: " + name); "Received duplicate header name: " + name);
} }
if (headerBlock.readableBytes() < 2) { if (!ensureBytes(decompressed, 2)) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received invalid header block"); "Received invalid header block");
} }
int valueLength = getUnsignedShort(headerBlock, headerBlock.readerIndex()); int valueLength = decompressed.readUnsignedShort();
headerBlock.skipBytes(2);
if (valueLength == 0) { if (valueLength == 0) {
headerFrame.setInvalid(); headerFrame.setInvalid();
return; return;
@ -371,15 +363,15 @@ public class SpdyFrameDecoder extends FrameDecoder {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Header block exceeds " + maxHeaderSize); "Header block exceeds " + maxHeaderSize);
} }
if (headerBlock.readableBytes() < valueLength) { if (!ensureBytes(decompressed, valueLength)) {
throw new SpdyProtocolException( throw new SpdyProtocolException(
"Received invalid header block"); "Received invalid header block");
} }
byte[] valueBytes = new byte[valueLength]; byte[] valueBytes = new byte[valueLength];
headerBlock.readBytes(valueBytes); decompressed.readBytes(valueBytes);
int index = 0; int index = 0;
int offset = 0; int offset = 0;
while (index < valueBytes.length) { while (index < valueLength) {
while (index < valueBytes.length && valueBytes[index] != (byte) 0) { while (index < valueBytes.length && valueBytes[index] != (byte) 0) {
index ++; index ++;
} }

View File

@ -13,42 +13,28 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.Set; import java.util.Set;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.compression.ZlibEncoder; import io.netty.channel.ChannelStateEvent;
import io.netty.handler.codec.embedder.EncoderEmbedder;
import io.netty.handler.codec.oneone.OneToOneEncoder; import io.netty.handler.codec.oneone.OneToOneEncoder;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
/** /**
* Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}. * Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}.
*/ */
public class SpdyFrameEncoder extends OneToOneEncoder { public class SpdyFrameEncoder extends OneToOneEncoder {
private final EncoderEmbedder<ChannelBuffer> headerBlockCompressor; private volatile boolean finished;
private final SpdyHeaderBlockCompressor headerBlockCompressor;
/** /**
* Creates a new instance with the default {@code compressionLevel (6)}, * Creates a new instance with the default {@code compressionLevel (6)},
@ -63,8 +49,27 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
*/ */
public SpdyFrameEncoder(int compressionLevel, int windowBits, int memLevel) { public SpdyFrameEncoder(int compressionLevel, int windowBits, int memLevel) {
super(); super();
headerBlockCompressor = new EncoderEmbedder<ChannelBuffer>( headerBlockCompressor = SpdyHeaderBlockCompressor.newInstance(compressionLevel, windowBits, memLevel);
new ZlibEncoder(compressionLevel, windowBits, memLevel, SPDY_DICT)); }
@Override
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (evt instanceof ChannelStateEvent) {
ChannelStateEvent e = (ChannelStateEvent) evt;
switch (e.getState()) {
case OPEN:
case CONNECTED:
case BOUND:
if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
synchronized (headerBlockCompressor) {
finished = true;
headerBlockCompressor.end();
}
}
}
}
super.handleDownstream(ctx, evt);
} }
@Override @Override
@ -278,7 +283,13 @@ public class SpdyFrameEncoder extends OneToOneEncoder {
if (uncompressed.readableBytes() == 0) { if (uncompressed.readableBytes() == 0) {
return ChannelBuffers.EMPTY_BUFFER; return ChannelBuffers.EMPTY_BUFFER;
} }
headerBlockCompressor.offer(uncompressed); ChannelBuffer compressed = ChannelBuffers.dynamicBuffer();
return headerBlockCompressor.poll(); synchronized (headerBlockCompressor) {
if (!finished) {
headerBlockCompressor.setInput(uncompressed);
headerBlockCompressor.encode(compressed);
}
}
return compressed;
} }
} }

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.List; import java.util.List;

View File

@ -0,0 +1,54 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
import java.util.zip.Deflater;
import io.netty.buffer.ChannelBuffer;
abstract class SpdyHeaderBlockCompressor {
private static final boolean USE_ZLIB;
static {
boolean java7 = false;
try {
Deflater.class.getDeclaredMethod(
"deflate",
new Class<?>[] { byte[].class, int.class, int.class, int.class });
java7 = true;
} catch (Exception e) {
// Ignore
}
USE_ZLIB = java7;
}
static SpdyHeaderBlockCompressor newInstance(
int compressionLevel, int windowBits, int memLevel) {
if (USE_ZLIB) {
return new SpdyHeaderBlockZlibCompressor(compressionLevel);
} else {
return new SpdyHeaderBlockJZlibCompressor(
compressionLevel, windowBits, memLevel);
}
}
abstract void setInput(ChannelBuffer decompressed);
abstract void encode(ChannelBuffer compressed);
abstract void end();
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer;
abstract class SpdyHeaderBlockDecompressor {
static SpdyHeaderBlockDecompressor newInstance() {
return new SpdyHeaderBlockJZlibDecompressor();
}
abstract void setInput(ChannelBuffer compressed);
abstract void decode(ChannelBuffer decompressed);
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.compression.CompressionException;
import io.netty.util.internal.jzlib.JZlib;
import io.netty.util.internal.jzlib.ZStream;
class SpdyHeaderBlockJZlibCompressor extends SpdyHeaderBlockCompressor {
private final ZStream z = new ZStream();
public SpdyHeaderBlockJZlibCompressor(int compressionLevel, int windowBits, int memLevel) {
if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)");
}
if (windowBits < 9 || windowBits > 15) {
throw new IllegalArgumentException(
"windowBits: " + windowBits + " (expected: 9-15)");
}
if (memLevel < 1 || memLevel > 9) {
throw new IllegalArgumentException(
"memLevel: " + memLevel + " (expected: 1-9)");
}
int resultCode = z.deflateInit(
compressionLevel, windowBits, memLevel, JZlib.W_ZLIB);
if (resultCode != JZlib.Z_OK) {
throw new CompressionException(
"failed to initialize an SPDY header block deflater: " + resultCode);
} else {
resultCode = z.deflateSetDictionary(SPDY_DICT, SPDY_DICT.length);
if (resultCode != JZlib.Z_OK) {
throw new CompressionException(
"failed to set the SPDY dictionary: " + resultCode);
}
}
}
@Override
public void setInput(ChannelBuffer decompressed) {
byte[] in = new byte[decompressed.readableBytes()];
decompressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
}
@Override
public void encode(ChannelBuffer compressed) {
try {
byte[] out = new byte[(int) Math.ceil(z.next_in.length * 1.001) + 12];
z.next_out = out;
z.next_out_index = 0;
z.avail_out = out.length;
int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
if (resultCode != JZlib.Z_OK) {
throw new CompressionException("compression failure: " + resultCode);
}
if (z.next_out_index != 0) {
compressed.writeBytes(out, 0, z.next_out_index);
}
} finally {
// Deference the external references explicitly to tell the VM that
// the allocated byte arrays are temporary so that the call stack
// can be utilized.
// I'm not sure if the modern VMs do this optimization though.
z.next_in = null;
z.next_out = null;
}
}
@Override
public void end() {
z.deflateEnd();
z.next_in = null;
z.next_out = null;
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
import io.netty.buffer.ChannelBuffer;
import io.netty.handler.codec.compression.CompressionException;
import io.netty.util.internal.jzlib.JZlib;
import io.netty.util.internal.jzlib.ZStream;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
class SpdyHeaderBlockJZlibDecompressor extends SpdyHeaderBlockDecompressor {
private final byte[] out = new byte[8192];
private final ZStream z = new ZStream();
public SpdyHeaderBlockJZlibDecompressor() {
int resultCode;
resultCode = z.inflateInit(JZlib.W_ZLIB);
if (resultCode != JZlib.Z_OK) {
throw new CompressionException("ZStream initialization failure");
}
z.next_out = out;
}
@Override
public void setInput(ChannelBuffer compressed) {
byte[] in = new byte[compressed.readableBytes()];
compressed.readBytes(in);
z.next_in = in;
z.next_in_index = 0;
z.avail_in = in.length;
}
@Override
public void decode(ChannelBuffer decompressed) {
z.next_out_index = 0;
z.avail_out = out.length;
int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
if (resultCode == JZlib.Z_NEED_DICT) {
resultCode = z.inflateSetDictionary(SPDY_DICT, SPDY_DICT.length);
if (resultCode != JZlib.Z_OK) {
throw new CompressionException("ZStream dictionary failure");
}
z.inflate(JZlib.Z_SYNC_FLUSH);
}
if (z.next_out_index > 0) {
decompressed.writeBytes(out, 0, z.next_out_index);
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.spdy;
import java.util.zip.Deflater;
import io.netty.buffer.ChannelBuffer;
import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
class SpdyHeaderBlockZlibCompressor extends SpdyHeaderBlockCompressor {
private final byte[] out = new byte[8192];
private final Deflater compressor;
public SpdyHeaderBlockZlibCompressor(int compressionLevel) {
if (compressionLevel < 0 || compressionLevel > 9) {
throw new IllegalArgumentException(
"compressionLevel: " + compressionLevel + " (expected: 0-9)");
}
compressor = new Deflater(compressionLevel);
compressor.setDictionary(SPDY_DICT);
}
@Override
public void setInput(ChannelBuffer decompressed) {
byte[] in = new byte[decompressed.readableBytes()];
decompressed.readBytes(in);
compressor.setInput(in);
}
@Override
public void encode(ChannelBuffer compressed) {
while (!compressor.needsInput()) {
int numBytes = compressor.deflate(out, 0, out.length, Deflater.SYNC_FLUSH);
compressed.writeBytes(out, 0, numBytes);
}
}
@Override
public void end() {
compressor.end();
}
}

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.LinkedList; import java.util.LinkedList;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.ChannelDownstreamHandler;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.HashMap; import java.util.HashMap;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.List; import java.util.List;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.Map; import java.util.Map;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.net.SocketAddress; import java.net.SocketAddress;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.Set; import java.util.Set;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
/** /**

View File

@ -13,22 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** /**
* Encoder, decoder, session handler and their related message types for the SPDY protocol. * Encoder, decoder, session handler and their related message types for the SPDY protocol.
* *

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import static org.junit.Assert.*; import static org.junit.Assert.*;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;

View File

@ -13,21 +13,6 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
/*
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.handler.codec.spdy; package io.netty.handler.codec.spdy;
import java.util.List; import java.util.List;

View File

@ -22,9 +22,11 @@ import java.util.ConcurrentModificationException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import io.netty.channel.Channels;
import io.netty.buffer.ChannelBufferFactory; import io.netty.buffer.ChannelBufferFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -224,6 +226,16 @@ abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
throw new CodecEmbedderException(actualCause); throw new CodecEmbedderException(actualCause);
} }
@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
try {
task.run();
return Channels.succeededFuture(pipeline.getChannel());
} catch (Throwable t) {
return Channels.failedFuture(pipeline.getChannel(), t);
}
}
} }
private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline { private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.frame; package io.netty.handler.codec.frame;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -74,11 +75,13 @@ public class FixedLengthFrameDecoder extends FrameDecoder {
} }
@Override @Override
protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { protected ChannelBuffer newCumulationBuffer(ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
if (allocateFullBuffer) { if (allocateFullBuffer) {
return ChannelBuffers.dynamicBuffer(frameLength, ctx.getChannel().getConfig().getBufferFactory()); return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), frameLength, ctx.getChannel().getConfig().getBufferFactory());
} }
return super.createCumulationDynamicBuffer(ctx); return super.newCumulationBuffer(ctx, minimumCapacity);
} }
} }

View File

@ -18,6 +18,7 @@ package io.netty.handler.codec.frame;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBufferFactory;
import io.netty.buffer.ChannelBuffers; import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -206,23 +207,20 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
if (input.readable()) { if (input.readable()) {
// seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it // seems like there is something readable left in the input buffer. So create the cumulation buffer and copy the input into it
ChannelBuffer cumulation = cumulation(ctx); (this.cumulation = newCumulationBuffer(ctx, input.readableBytes())).writeBytes(input);
cumulation.writeBytes(input);
} }
} else { } else {
ChannelBuffer cumulation = cumulation(ctx); ChannelBuffer cumulation = this.cumulation;
if (cumulation.readable()) { assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes(); cumulation.discardReadBytes();
cumulation.writeBytes(input); }
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); cumulation.writeBytes(input);
} else { callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); if (!cumulation.readable()) {
if (input.readable()) { this.cumulation = null;
cumulation.writeBytes(input);
}
} }
} }
} }
@Override @Override
@ -303,10 +301,6 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
unfoldAndFireMessageReceived(context, remoteAddress, frame); unfoldAndFireMessageReceived(context, remoteAddress, frame);
} }
if (!cumulation.readable()) {
this.cumulation = null;
}
} }
private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
@ -333,10 +327,10 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
ChannelBuffer cumulation = this.cumulation; ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) { if (cumulation == null) {
return; return;
} else {
this.cumulation = null;
} }
this.cumulation = null;
if (cumulation.readable()) { if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel. // Make sure all frames are read before notifying a closed channel.
callDecode(ctx, ctx.getChannel(), cumulation, null); callDecode(ctx, ctx.getChannel(), cumulation, null);
@ -355,28 +349,18 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
} }
/** /**
* Get the currently used {@link ChannelBuffer} for cumulation or create one in a lazy fashion if none exist yet * Create a new {@link ChannelBuffer} which is used for the cumulation.
* * Be aware that this MUST be a dynamic buffer. Sub-classes may override
* @param ctx the {@link ChannelHandlerContext} for this handler * this to provide a dynamic {@link ChannelBuffer} which has some
* @return buffer the {@link ChannelBuffer} which is used for cumulation * pre-allocated size that better fit their need.
*/ *
private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
ChannelBuffer c = cumulation;
if (c == null) {
c = createCumulationDynamicBuffer(ctx);
cumulation = c;
}
return c;
}
/**
* Create a new {@link ChannelBuffer} which is used for the cumulation. Be aware that this MUST be a dynamic buffer. Sub-classes may override this to provide a
* dynamic {@link ChannelBuffer} which has some prelocated size that better fit their need.
*
* @param ctx {@link ChannelHandlerContext} for this handler * @param ctx {@link ChannelHandlerContext} for this handler
* @return buffer the {@link ChannelBuffer} which is used for cumulation * @return buffer the {@link ChannelBuffer} which is used for cumulation
*/ */
protected ChannelBuffer createCumulationDynamicBuffer(ChannelHandlerContext ctx) { protected ChannelBuffer newCumulationBuffer(
return ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory()); ChannelHandlerContext ctx, int minimumCapacity) {
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
return ChannelBuffers.dynamicBuffer(
factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
} }
} }

View File

@ -289,7 +289,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private ChannelBuffer cumulation; private ChannelBuffer cumulation;
private boolean needsCleanup;
private final boolean unfold; private final boolean unfold;
private ReplayingDecoderBuffer replayable; private ReplayingDecoderBuffer replayable;
private T state; private T state;
@ -430,11 +429,58 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
return; return;
} }
ChannelBuffer cumulation = cumulation(ctx); if (cumulation == null) {
needsCleanup = true; // the cumulation buffer is not created yet so just pass the input
cumulation.discardReadBytes(); // to callDecode(...) method
cumulation.writeBytes(input); this.cumulation = input;
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); replayable = new ReplayingDecoderBuffer(input);
int oldReaderIndex = input.readerIndex();
int inputSize = input.readableBytes();
callDecode(
ctx, e.getChannel(),
input, replayable,
e.getRemoteAddress());
if (input.readable()) {
// seems like there is something readable left in the input buffer
// or decoder wants a replay - create the cumulation buffer and
// copy the input into it
ChannelBuffer cumulation;
if (checkpoint > 0) {
int bytesToPreserve = inputSize - (checkpoint - oldReaderIndex);
cumulation = this.cumulation =
newCumulationBuffer(ctx, bytesToPreserve);
cumulation.writeBytes(input, checkpoint, bytesToPreserve);
} else if (checkpoint == 0) {
cumulation = this.cumulation =
newCumulationBuffer(ctx, inputSize);
cumulation.writeBytes(input, oldReaderIndex, inputSize);
cumulation.readerIndex(input.readerIndex());
} else {
cumulation = this.cumulation =
newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input);
}
replayable = new ReplayingDecoderBuffer(cumulation);
} else {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
} else {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable();
if (cumulation.writableBytes() < input.readableBytes()) {
cumulation.discardReadBytes();
}
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, replayable, e.getRemoteAddress());
if (!cumulation.readable()) {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
}
} }
@Override @Override
@ -455,15 +501,15 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) { while (input.readable()) {
int oldReaderIndex = checkpoint = cumulation.readerIndex(); int oldReaderIndex = checkpoint = input.readerIndex();
Object result = null; Object result = null;
T oldState = state; T oldState = state;
try { try {
result = decode(context, channel, replayable, state); result = decode(context, channel, replayableInput, state);
if (result == null) { if (result == null) {
if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException( throw new IllegalStateException(
"null cannot be returned if no data is consumed and state didn't change."); "null cannot be returned if no data is consumed and state didn't change.");
} else { } else {
@ -476,7 +522,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// Return to the checkpoint (or oldPosition) and retry. // Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint; int checkpoint = this.checkpoint;
if (checkpoint >= 0) { if (checkpoint >= 0) {
cumulation.readerIndex(checkpoint); input.readerIndex(checkpoint);
} else { } else {
// Called by cleanup() - no need to maintain the readerIndex // Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already. // anymore because the buffer has been released already.
@ -489,7 +535,7 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
break; break;
} }
if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { if (oldReaderIndex == input.readerIndex() && oldState == state) {
throw new IllegalStateException( throw new IllegalStateException(
"decode() method must consume at least one byte " + "decode() method must consume at least one byte " +
"if it returned a decoded message (caused by: " + "if it returned a decoded message (caused by: " +
@ -498,11 +544,6 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
// A successful decode // A successful decode
unfoldAndFireMessageReceived(context, result, remoteAddress); unfoldAndFireMessageReceived(context, result, remoteAddress);
if (!cumulation.readable()) {
this.cumulation = null;
replayable = ReplayingDecoderBuffer.EMPTY_BUFFER;
}
} }
} }
@ -528,19 +569,17 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception { throws Exception {
try { try {
if (!needsCleanup) { ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) {
return; return;
} else {
needsCleanup = false;
} }
ChannelBuffer cumulation = this.cumulation;
this.cumulation = null; this.cumulation = null;
replayable.terminate(); replayable.terminate();
if (cumulation != null && cumulation.readable()) { if (cumulation != null && cumulation.readable()) {
// Make sure all data was read before notifying a closed channel. // Make sure all data was read before notifying a closed channel.
callDecode(ctx, e.getChannel(), cumulation, null); callDecode(ctx, e.getChannel(), cumulation, replayable, null);
} }
// Call decodeLast() finally. Please note that decodeLast() is // Call decodeLast() finally. Please note that decodeLast() is
@ -558,19 +597,19 @@ public abstract class ReplayingDecoder<T extends Enum<T>>
} }
} }
private ChannelBuffer cumulation(ChannelHandlerContext ctx) { /**
ChannelBuffer buf = this.cumulation; * Create a new {@link ChannelBuffer} which is used for the cumulation.
if (buf == null) { * Be aware that this MUST be a dynamic buffer. Sub-classes may override
* this to provide a dynamic {@link ChannelBuffer} which has some
if (cumulation == null) { * pre-allocated size that better fit their need.
ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); *
buf = new UnsafeDynamicChannelBuffer(factory); * @param ctx {@link ChannelHandlerContext} for this handler
cumulation = buf; * @return buffer the {@link ChannelBuffer} which is used for cumulation
replayable = new ReplayingDecoderBuffer(buf); */
} else { protected ChannelBuffer newCumulationBuffer(
buf = cumulation; ChannelHandlerContext ctx, int minimumCapacity) {
} ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
} return ChannelBuffers.dynamicBuffer(
return buf; factory.getDefaultOrder(), Math.max(minimumCapacity, 256), factory);
} }
} }

View File

@ -612,6 +612,7 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <E> E cast(Object item) { static <E> E cast(Object item) {
// assert item == null || item.getClass() != Node.class; // assert item == null || item.getClass() != Node.class;
// Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954
return (E) item; return (E) item;
} }
@ -625,6 +626,7 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
* @return an item if matched, else e * @return an item if matched, else e
* @throws NullPointerException if haveData mode but e is null * @throws NullPointerException if haveData mode but e is null
*/ */
@SuppressWarnings("unchecked")
private E xfer(E e, boolean haveData, int how, long nanos) { private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && e == null) { if (haveData && e == null) {
throw new NullPointerException(); throw new NullPointerException();
@ -653,7 +655,8 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
} }
} }
LockSupport.unpark(p.waiter); LockSupport.unpark(p.waiter);
return LegacyLinkedTransferQueue.cast(item); // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954
return (E) LegacyLinkedTransferQueue.cast(item);
} }
} }
Node n = p.next; Node n = p.next;
@ -726,6 +729,7 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
* @param nanos timeout in nanosecs, used only if timed is true * @param nanos timeout in nanosecs, used only if timed is true
* @return matched item, or e if unmatched on interrupt or timeout * @return matched item, or e if unmatched on interrupt or timeout
*/ */
@SuppressWarnings("unchecked")
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0L; long lastTime = timed ? System.nanoTime() : 0L;
Thread w = Thread.currentThread(); Thread w = Thread.currentThread();
@ -737,7 +741,8 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
if (item != e) { // matched if (item != e) { // matched
// assert item != s; // assert item != s;
s.forgetContents(); // avoid garbage s.forgetContents(); // avoid garbage
return LegacyLinkedTransferQueue.cast(item); // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954
return (E) LegacyLinkedTransferQueue.cast(item);
} }
if ((w.isInterrupted() || timed && nanos <= 0) && if ((w.isInterrupted() || timed && nanos <= 0) &&
s.casItem(e, s)) { // cancel s.casItem(e, s)) { // cancel
@ -820,12 +825,14 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
* Returns the item in the first unmatched node with isData; or * Returns the item in the first unmatched node with isData; or
* null if none. Used by peek. * null if none. Used by peek.
*/ */
@SuppressWarnings("unchecked")
private E firstDataItem() { private E firstDataItem() {
for (Node p = head; p != null; p = succ(p)) { for (Node p = head; p != null; p = succ(p)) {
Object item = p.item; Object item = p.item;
if (p.isData) { if (p.isData) {
if (item != null && item != p) { if (item != null && item != p) {
return LegacyLinkedTransferQueue.cast(item); // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954
return (E) LegacyLinkedTransferQueue.cast(item);
} }
} }
else if (item == null) { else if (item == null) {
@ -870,6 +877,7 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
/** /**
* Moves to next node after prev, or first node if prev null. * Moves to next node after prev, or first node if prev null.
*/ */
@SuppressWarnings("unchecked")
private void advance(Node prev) { private void advance(Node prev) {
lastPred = lastRet; lastPred = lastRet;
lastRet = prev; lastRet = prev;
@ -878,7 +886,8 @@ public class LegacyLinkedTransferQueue<E> extends AbstractQueue<E>
Object item = p.item; Object item = p.item;
if (p.isData) { if (p.isData) {
if (item != null && item != p) { if (item != null && item != p) {
nextItem = LegacyLinkedTransferQueue.cast(item); // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954
nextItem = (E) LegacyLinkedTransferQueue.cast(item);
nextNode = p; nextNode = p;
return; return;
} }

View File

@ -81,6 +81,7 @@ import java.util.concurrent.locks.LockSupport;
* @param <E> the type of elements held in this collection * @param <E> the type of elements held in this collection
*/ */
@SuppressWarnings("restriction")
public class LinkedTransferQueue<E> extends AbstractQueue<E> public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable { implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L; private static final long serialVersionUID = -3223113410248163686L;
@ -529,8 +530,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
return false; return false;
} }
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics // Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE; private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset; private static final long itemOffset;

View File

@ -112,4 +112,8 @@ final class Adler32 {
} }
return s2 << 16 | s1; return s2 << 16 | s1;
} }
private Adler32() {
// Utility Class
}
} }

View File

@ -92,4 +92,8 @@ final class CRC32 {
crc32 ^= 0xffffffff; crc32 ^= 0xffffffff;
return crc32; return crc32;
} }
private CRC32() {
// Utility Class
}
} }

View File

@ -96,7 +96,7 @@ final class Deflate {
"data error", // Z_DATA_ERROR (-3) "data error", // Z_DATA_ERROR (-3)
"insufficient memory", // Z_MEM_ERROR (-4) "insufficient memory", // Z_MEM_ERROR (-4)
"buffer error", // Z_BUF_ERROR (-5) "buffer error", // Z_BUF_ERROR (-5)
"incompatible version",// Z_VERSION_ERROR (-6) "incompatible version", // Z_VERSION_ERROR (-6)
"" }; "" };
// block not completed, need more input or more output // block not completed, need more input or more output
@ -342,7 +342,7 @@ final class Deflate {
// Scan a literal or distance tree to determine the frequencies of the codes // Scan a literal or distance tree to determine the frequencies of the codes
// in the bit length tree. // in the bit length tree.
private void scan_tree(short[] tree,// the tree to be scanned private void scan_tree(short[] tree, // the tree to be scanned
int max_code // and its largest code of non zero frequency int max_code // and its largest code of non zero frequency
) { ) {
int n; // iterates over all tree elements int n; // iterates over all tree elements
@ -437,7 +437,7 @@ final class Deflate {
// Send a literal or distance tree in compressed form, using the codes in // Send a literal or distance tree in compressed form, using the codes in
// bl_tree. // bl_tree.
private void send_tree(short[] tree,// the tree to be sent private void send_tree(short[] tree, // the tree to be sent
int max_code // and its largest code of non zero frequency int max_code // and its largest code of non zero frequency
) { ) {
int n; // iterates over all tree elements int n; // iterates over all tree elements
@ -514,7 +514,7 @@ final class Deflate {
private void send_code(int c, short[] tree) { private void send_code(int c, short[] tree) {
int c2 = c * 2; int c2 = c * 2;
send_bits((tree[c2] & 0xffff), (tree[c2 + 1] & 0xffff)); send_bits(tree[c2] & 0xffff, tree[c2 + 1] & 0xffff);
} }
private void send_bits(int value, int length) { private void send_bits(int value, int length) {
@ -804,7 +804,7 @@ final class Deflate {
int stored_len, // length of input block int stored_len, // length of input block
boolean eof // true if this is the last block for a file boolean eof // true if this is the last block for a file
) { ) {
int opt_lenb, static_lenb;// opt_len and static_len in bytes int opt_lenb, static_lenb; // opt_len and static_len in bytes
int max_blindex = 0; // index of last bit length code of non zero freq int max_blindex = 0; // index of last bit length code of non zero freq
// Build the Huffman trees unless a stored block is forced // Build the Huffman trees unless a stored block is forced

View File

@ -61,7 +61,7 @@ final class InfBlocks {
private static final int TYPE = 0; // get type bits (3, including end bit) private static final int TYPE = 0; // get type bits (3, including end bit)
private static final int LENS = 1; // get lengths for stored private static final int LENS = 1; // get lengths for stored
private static final int STORED = 2;// processing stored block private static final int STORED = 2; // processing stored block
private static final int TABLE = 3; // get table lengths private static final int TABLE = 3; // get table lengths
private static final int BTREE = 4; // get bit lengths tree for a dynamic block private static final int BTREE = 4; // get bit lengths tree for a dynamic block
private static final int DTREE = 5; // get length, distance trees for a dynamic block private static final int DTREE = 5; // get length, distance trees for a dynamic block
@ -124,16 +124,15 @@ final class InfBlocks {
int m; // bytes to end of window or read pointer int m; // bytes to end of window or read pointer
// copy input/output information to locals (UPDATE macro restores) // copy input/output information to locals (UPDATE macro restores)
{
p = z.next_in_index; p = z.next_in_index;
n = z.avail_in; n = z.avail_in;
b = bitb; b = bitb;
k = bitk; k = bitk;
}
{ q = write;
q = write; m = q < read? read - q - 1 : end - q;
m = q < read? read - q - 1 : end - q;
}
// process input based on current state // process input based on current state
while (true) { while (true) {
@ -161,20 +160,17 @@ final class InfBlocks {
switch (t >>> 1) { switch (t >>> 1) {
case 0: // stored case 0: // stored
{
b >>>= 3; b >>>= 3;
k -= 3; k -= 3;
}
t = k & 7; // go to byte boundary t = k & 7; // go to byte boundary
{ b >>>= t;
b >>>= t; k -= t;
k -= t;
}
mode = LENS; // get length of stored block mode = LENS; // get length of stored block
break; break;
case 1: // fixed case 1: // fixed
{
int[] bl = new int[1]; int[] bl = new int[1];
int[] bd = new int[1]; int[] bd = new int[1];
int[][] tl = new int[1][]; int[][] tl = new int[1][];
@ -182,30 +178,24 @@ final class InfBlocks {
InfTree.inflate_trees_fixed(bl, bd, tl, td); InfTree.inflate_trees_fixed(bl, bd, tl, td);
codes.init(bl[0], bd[0], tl[0], 0, td[0], 0); codes.init(bl[0], bd[0], tl[0], 0, td[0], 0);
}
{ b >>>= 3;
b >>>= 3; k -= 3;
k -= 3;
}
mode = CODES; mode = CODES;
break; break;
case 2: // dynamic case 2: // dynamic
{
b >>>= 3; b >>>= 3;
k -= 3; k -= 3;
}
mode = TABLE; mode = TABLE;
break; break;
case 3: // illegal case 3: // illegal
{
b >>>= 3; b >>>= 3;
k -= 3; k -= 3;
}
mode = BAD; mode = BAD;
z.msg = "invalid block type"; z.msg = "invalid block type";
r = JZlib.Z_DATA_ERROR; r = JZlib.Z_DATA_ERROR;
@ -352,10 +342,9 @@ final class InfBlocks {
} }
} }
{
b >>>= 14; b >>>= 14;
k -= 14; k -= 14;
}
index = 0; index = 0;
mode = BTREE; mode = BTREE;
@ -380,10 +369,9 @@ final class InfBlocks {
blens[border[index ++]] = b & 7; blens[border[index ++]] = b & 7;
{ b >>>= 3;
b >>>= 3; k -= 3;
k -= 3;
}
} }
while (index < 19) { while (index < 19) {
@ -505,36 +493,36 @@ final class InfBlocks {
} }
tb[0] = -1; tb[0] = -1;
{
int[] bl = new int[1];
int[] bd = new int[1];
int[] tl = new int[1];
int[] td = new int[1];
bl[0] = 9; // must be <= 9 for lookahead assumptions
bd[0] = 6; // must be <= 9 for lookahead assumptions
t = table; int[] bl = new int[1];
t = inftree.inflate_trees_dynamic(257 + (t & 0x1f), int[] bd = new int[1];
1 + (t >> 5 & 0x1f), blens, bl, bd, tl, td, hufts, int[] tl = new int[1];
z); int[] td = new int[1];
bl[0] = 9; // must be <= 9 for lookahead assumptions
bd[0] = 6; // must be <= 9 for lookahead assumptions
if (t != JZlib.Z_OK) { t = table;
if (t == JZlib.Z_DATA_ERROR) { t = inftree.inflate_trees_dynamic(257 + (t & 0x1f),
blens = null; 1 + (t >> 5 & 0x1f), blens, bl, bd, tl, td, hufts,
mode = BAD; z);
}
r = t;
bitb = b; if (t != JZlib.Z_OK) {
bitk = k; if (t == JZlib.Z_DATA_ERROR) {
z.avail_in = n; blens = null;
z.total_in += p - z.next_in_index; mode = BAD;
z.next_in_index = p;
write = q;
return inflate_flush(z, r);
} }
codes.init(bl[0], bd[0], hufts, tl[0], hufts, td[0]); r = t;
bitb = b;
bitk = k;
z.avail_in = n;
z.total_in += p - z.next_in_index;
z.next_in_index = p;
write = q;
return inflate_flush(z, r);
} }
codes.init(bl[0], bd[0], hufts, tl[0], hufts, td[0]);
mode = CODES; mode = CODES;
case CODES: case CODES:
bitb = b; bitb = b;

View File

@ -71,7 +71,7 @@ final class InfCodes {
// mode dependent information // mode dependent information
private int len; private int len;
private int[] tree; // pointer into tree private int[] tree; // pointer into tree
private int tree_index = 0; private int tree_index;
private int need; // bits needed private int need; // bits needed
private int lit; private int lit;
// if EXT or COPY, where and how much // if EXT or COPY, where and how much

View File

@ -180,8 +180,8 @@ final class InfTree {
int[] e, // list of extra bits for non-simple codes int[] e, // list of extra bits for non-simple codes
int[] t, // result: starting table int[] t, // result: starting table
int[] m, // maximum lookup bits, returns actual int[] m, // maximum lookup bits, returns actual
int[] hp,// space for trees int[] hp, // space for trees
int[] hn,// hufts used in space int[] hn, // hufts used in space
int[] v // working area: values in order of bit length int[] v // working area: values in order of bit length
) { ) {
// Given a list of code lengths and a maximum table size, make a set of // Given a list of code lengths and a maximum table size, make a set of
@ -437,7 +437,7 @@ final class InfTree {
static int inflate_trees_fixed(int[] bl, //literal desired/actual bit depth static int inflate_trees_fixed(int[] bl, //literal desired/actual bit depth
int[] bd, //distance desired/actual bit depth int[] bd, //distance desired/actual bit depth
int[][] tl,//literal/length tree result int[][] tl, //literal/length tree result
int[][] td //distance tree result int[][] td //distance tree result
) { ) {
bl[0] = fixed_bl; bl[0] = fixed_bl;

View File

@ -76,7 +76,7 @@ final class Inflate {
private static final int GZIP_FNAME = 21; private static final int GZIP_FNAME = 21;
private static final int GZIP_FCOMMENT = 22; private static final int GZIP_FCOMMENT = 22;
private static final int GZIP_FHCRC = 23; private static final int GZIP_FHCRC = 23;
private static final int GZIP_CRC32= 24; private static final int GZIP_CRC32 = 24;
private static final int GZIP_ISIZE = 25; private static final int GZIP_ISIZE = 25;
private int mode; // current inflate mode private int mode; // current inflate mode
@ -305,7 +305,7 @@ final class Inflate {
break; break;
} else if (z.istate.wrapperType == WrapperType.ZLIB) { } else if (z.istate.wrapperType == WrapperType.ZLIB) {
z.istate.mode = CHECK4; z.istate.mode = CHECK4;
} else if (z.istate.wrapperType == WrapperType.GZIP){ } else if (z.istate.wrapperType == WrapperType.GZIP) {
gzipCRC32 = 0; gzipCRC32 = 0;
gzipISize = 0; gzipISize = 0;
gzipBytesToRead = 4; gzipBytesToRead = 4;

View File

@ -105,4 +105,8 @@ public final class JZlib {
enum WrapperType { enum WrapperType {
NONE, ZLIB, GZIP, ZLIB_OR_NONE NONE, ZLIB, GZIP, ZLIB_OR_NONE
} }
private JZlib() {
// Utility class
}
} }

View File

@ -13,19 +13,9 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.handler.codec.replay;
import io.netty.buffer.ChannelBufferFactory; /**
import io.netty.buffer.DynamicChannelBuffer; * <em>Internal-use-only</em> utilities which is not allowed to be used
* outside Netty.
class UnsafeDynamicChannelBuffer extends DynamicChannelBuffer { */
package io.netty.util.internal.jzlib;
UnsafeDynamicChannelBuffer(ChannelBufferFactory factory) {
super(factory.getDefaultOrder(), 256, factory);
}
@Override
protected void checkReadableBytes(int minReaderRemaining) {
// Do not check here - ReplayingDecoderBuffer will check.
}
}

View File

@ -22,15 +22,12 @@ import io.netty.channel.ChannelPipelineFactory;
import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
public class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory { public class HttpSnoopServerPipelineFactory implements ChannelPipelineFactory {
@Override @Override
public ChannelPipeline getPipeline() throws Exception { public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation. // Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline(); ChannelPipeline pipeline = pipeline();
pipeline.addLast("log", new LoggingHandler(InternalLogLevel.INFO));
// Uncomment the following line if you want HTTPS // Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();

View File

@ -17,9 +17,6 @@ package io.netty.example.http.websocketx.autobahn;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory; import io.netty.channel.socket.nio.NioServerSocketChannelFactory;

View File

@ -17,23 +17,20 @@ package io.netty.example.http.websocketx.server;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory; import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
* A HTTP server which serves Web Socket requests at: * A HTTP server which serves Web Socket requests at:
* *
* http://localhost:8080/websocket * http://localhost:8080/websocket
* *
* Open your browser at http://localhost:8080/, then the demo page will be loaded and a Web Socket connection will be * Open your browser at http://localhost:8080/, then the demo page will be loaded and a Web Socket connection will be
* made automatically. * made automatically.
* *
* This server illustrates support for the different web socket specification versions and will work with: * This server illustrates support for the different web socket specification versions and will work with:
* *
* <ul> * <ul>
* <li>Safari 5+ (draft-ietf-hybi-thewebsocketprotocol-00) * <li>Safari 5+ (draft-ietf-hybi-thewebsocketprotocol-00)
* <li>Chrome 6-13 (draft-ietf-hybi-thewebsocketprotocol-00) * <li>Chrome 6-13 (draft-ietf-hybi-thewebsocketprotocol-00)

View File

@ -17,23 +17,20 @@ package io.netty.example.http.websocketx.sslserver;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory; import io.netty.channel.socket.nio.NioServerSocketChannelFactory;
/** /**
* A HTTP server which serves Web Socket requests at: * A HTTP server which serves Web Socket requests at:
* *
* https://localhost:8081/websocket * https://localhost:8081/websocket
* *
* Open your browser at https://localhost:8081/, then the demo page will be loaded and a Web Socket connection will be * Open your browser at https://localhost:8081/, then the demo page will be loaded and a Web Socket connection will be
* made automatically. * made automatically.
* *
* This server illustrates support for the different web socket specification versions and will work with: * This server illustrates support for the different web socket specification versions and will work with:
* *
* <ul> * <ul>
* <li>Safari 5+ (draft-ietf-hybi-thewebsocketprotocol-00) * <li>Safari 5+ (draft-ietf-hybi-thewebsocketprotocol-00)
* <li>Chrome 6-13 (draft-ietf-hybi-thewebsocketprotocol-00) * <li>Chrome 6-13 (draft-ietf-hybi-thewebsocketprotocol-00)

View File

@ -87,7 +87,6 @@ public class SctpClient {
// Parse options. // Parse options.
final String host = args[0]; final String host = args[0];
final int port = Integer.parseInt(args[1]); final int port = Integer.parseInt(args[1]);
final int firstMessageSize;
new SctpClient(host, port).run(); new SctpClient(host, port).run();
} }

View File

@ -45,7 +45,7 @@ public class SecureChatClientPipelineFactory implements
SSLEngine engine = SSLEngine engine =
SecureChatSslContextFactory.getClientContext().createSSLEngine(); SecureChatSslContextFactory.getClientContext().createSSLEngine();
//engine.setUseClientMode(true); engine.setUseClientMode(true);
pipeline.addLast("ssl", new SslHandler(engine)); pipeline.addLast("ssl", new SslHandler(engine));

View File

@ -110,25 +110,40 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
private final Executor executor; private final Executor executor;
private final boolean handleDownstream; private final boolean handleDownstream;
private final boolean handleUpstream;
/**
* Creates a new instance with the specified {@link Executor} which only handles upstream events.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor) {
this(executor, false, true);
}
/**
* Use {@link #ExecutionHandler(Executor, boolean, boolean)}
*
* {@link Deprecated}
*/
@Deprecated
public ExecutionHandler(Executor executor, boolean handleDownstream) {
this(executor, handleDownstream, true);
}
/** /**
* Creates a new instance with the specified {@link Executor}. * Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure. * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/ */
public ExecutionHandler(Executor executor) { public ExecutionHandler(Executor executor, boolean handleDownstream, boolean handleUpstream) {
this(executor, false);
}
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor, boolean handleDownstream) {
if (executor == null) { if (executor == null) {
throw new NullPointerException("executor"); throw new NullPointerException("executor");
} }
if (!handleDownstream && !handleUpstream) {
throw new IllegalArgumentException("You must handle at least handle one event type");
}
this.executor = executor; this.executor = executor;
this.handleDownstream = handleDownstream; this.handleDownstream = handleDownstream;
this.handleUpstream = handleUpstream;
} }
/** /**
@ -154,7 +169,11 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
@Override @Override
public void handleUpstream( public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception { ChannelHandlerContext context, ChannelEvent e) throws Exception {
executor.execute(new ChannelUpstreamEventRunnable(context, e)); if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e));
} else {
context.sendUpstream(e);
}
} }
@Override @Override

View File

@ -419,10 +419,18 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
//System.out.println("READABLE"); //System.out.println("READABLE");
ChannelHandlerContext ctx = eventTask.getContext(); ChannelHandlerContext ctx = eventTask.getContext();
if (ctx.getHandler() instanceof ExecutionHandler) { if (ctx.getHandler() instanceof ExecutionHandler) {
// readSuspended = false; // check if the attachment was set as this means that we suspend the channel from reads. This only works when
ctx.setAttachment(null); // this pool is used with ExecutionHandler but I guess thats good enough for us.
//
// See #215
if (ctx.getAttachment() != null) {
// readSuspended = false;
ctx.setAttachment(null);
channel.setReadable(true);
}
} else {
channel.setReadable(true);
} }
channel.setReadable(true);
} }
} }
} }

View File

@ -0,0 +1,165 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.execution;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
* {@link Executor} which should be used for downstream {@link ChannelEvent}'s. This implementation will take care of preserve the order of the events in a {@link Channel}.
* If you don't need to preserve the order just use one of the {@link Executor} implementations provided by the static methods of {@link Executors}.
* <br>
* <br>
*
* For more informations about how the order is preserved see {@link OrderedMemoryAwareThreadPoolExecutor}
*
*/
public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor {
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
*/
public OrderedDownstreamThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, 0L, 0L);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public OrderedDownstreamThreadPoolExecutor(
int corePoolSize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, 0L, 0L, keepAliveTime, unit);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public OrderedDownstreamThreadPoolExecutor(
int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, 0L, 0L,
keepAliveTime, unit, threadFactory);
}
/**
* Return <code>null</code>
*/
@Override
public ObjectSizeEstimator getObjectSizeEstimator() {
return null;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Returns <code>0L</code>
*/
@Override
public long getMaxChannelMemorySize() {
return 0L;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setMaxChannelMemorySize(long maxChannelMemorySize) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Returns <code>0L</code>
*/
@Override
public long getMaxTotalMemorySize() {
return 0L;
}
/**
* Throws {@link UnsupportedOperationException} as there is not support for limit the memory size in this implementation
*/
@Override
public void setMaxTotalMemorySize(long maxTotalMemorySize) {
throw new UnsupportedOperationException("Not supported by this implementation");
}
/**
* Return <code>false</code> as we not need to cound the memory in this implementation
*/
@Override
protected boolean shouldCount(Runnable task) {
return false;
}
@Override
public void execute(Runnable command) {
// check if the Runnable was of an unsupported type
if (command instanceof ChannelUpstreamEventRunnable) {
throw new RejectedExecutionException("command must be enclosed with an downstream event.");
}
doExecute(command);
}
@Override
protected Executor getChildExecutor(ChannelEvent e) {
final Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key);
if (executor == null) {
executor = new ChildExecutor();
Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
if (oldExecutor != null) {
executor = oldExecutor;
} else {
// register a listener so that the ChildExecutor will get removed once the channel was closed
e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
removeChildExecutor(key);
}
});
}
}
return executor;
}
}

View File

@ -136,7 +136,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
// TODO Make OMATPE focus on the case where Channel is the key. // TODO Make OMATPE focus on the case where Channel is the key.
// Add a new less-efficient TPE that allows custom key. // Add a new less-efficient TPE that allows custom key.
private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap(); protected final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
/** /**
* Creates a new instance. * Creates a new instance.
@ -242,7 +242,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
} }
private Executor getChildExecutor(ChannelEvent e) { protected Executor getChildExecutor(ChannelEvent e) {
Object key = getChildExecutorKey(e); Object key = getChildExecutorKey(e);
Executor executor = childExecutors.get(key); Executor executor = childExecutors.get(key);
if (executor == null) { if (executor == null) {
@ -278,7 +278,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
afterExecute(r, t); afterExecute(r, t);
} }
private final class ChildExecutor implements Executor, Runnable { protected final class ChildExecutor implements Executor, Runnable {
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean(); private final AtomicBoolean isRunning = new AtomicBoolean();

View File

@ -574,15 +574,15 @@ public class SslHandler extends FrameDecoder
if (tls) { if (tls) {
// SSLv3 or TLS - Check ProtocolVersion // SSLv3 or TLS - Check ProtocolVersion
int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1); int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
if (majorVersion >= 3 && majorVersion < 10) { if (majorVersion == 3) {
// SSLv3 or TLS // SSLv3 or TLS
packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5; packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
if (packetLength <= 5) { if (packetLength <= 5) {
// Neither SSLv2 or TLSv1 (i.e. SSLv2 or bad data) // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
tls = false; tls = false;
} }
} else { } else {
// Neither SSLv2 or TLSv1 (i.e. SSLv2 or bad data) // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
tls = false; tls = false;
} }
} }
@ -594,7 +594,7 @@ public class SslHandler extends FrameDecoder
buffer.readerIndex()) & 0x80) != 0 ? 2 : 3; buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
int majorVersion = buffer.getUnsignedByte( int majorVersion = buffer.getUnsignedByte(
buffer.readerIndex() + headerLength + 1); buffer.readerIndex() + headerLength + 1);
if (majorVersion >= 2 && majorVersion < 10) { if (majorVersion == 2 || majorVersion == 3) {
// SSLv2 // SSLv2
if (headerLength == 2) { if (headerLength == 2) {
packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2; packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
@ -972,7 +972,7 @@ public class SslHandler extends FrameDecoder
outAppBuf.flip(); outAppBuf.flip();
if (outAppBuf.hasRemaining()) { if (outAppBuf.hasRemaining()) {
ChannelBuffer frame = ChannelBuffers.buffer(outAppBuf.remaining()); ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
frame.writeBytes(outAppBuf.array(), 0, frame.capacity()); frame.writeBytes(outAppBuf.array(), 0, frame.capacity());
return frame; return frame;
} else { } else {

View File

@ -90,7 +90,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
} }
try { try {
flush(ctx); flush(ctx, false);
} catch (Exception e) { } catch (Exception e) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while sending chunks.", e); logger.warn("Unexpected exception while sending chunks.", e);
@ -112,10 +112,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
final Channel channel = ctx.getChannel(); final Channel channel = ctx.getChannel();
if (channel.isWritable()) { if (channel.isWritable()) {
this.ctx = ctx; this.ctx = ctx;
flush(ctx); flush(ctx, false);
} else if (!channel.isConnected()) { } else if (!channel.isConnected()) {
this.ctx = ctx; this.ctx = ctx;
discard(ctx); discard(ctx, false);
} }
} }
@ -127,12 +127,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
switch (cse.getState()) { switch (cse.getState()) {
case INTEREST_OPS: case INTEREST_OPS:
// Continue writing when the channel becomes writable. // Continue writing when the channel becomes writable.
flush(ctx); flush(ctx, true);
break; break;
case OPEN: case OPEN:
if (!Boolean.TRUE.equals(cse.getValue())) { if (!Boolean.TRUE.equals(cse.getValue())) {
// Fail all pending writes // Fail all pending writes
discard(ctx); discard(ctx, true);
} }
break; break;
} }
@ -140,7 +140,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
private void discard(ChannelHandlerContext ctx) { private void discard(ChannelHandlerContext ctx, boolean fireNow) {
ClosedChannelException cause = null; ClosedChannelException cause = null;
boolean fireExceptionCaught = false; boolean fireExceptionCaught = false;
@ -169,20 +169,22 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
} }
currentEvent.getFuture().setFailure(cause); currentEvent.getFuture().setFailure(cause);
fireExceptionCaught = true; fireExceptionCaught = true;
currentEvent = null;
} }
if (fireExceptionCaught) { if (fireExceptionCaught) {
Channels.fireExceptionCaught(ctx.getChannel(), cause); if (fireNow) {
fireExceptionCaught(ctx, cause);
} else {
fireExceptionCaughtLater(ctx, cause);
}
} }
} }
private synchronized void flush(ChannelHandlerContext ctx) throws Exception { private synchronized void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
final Channel channel = ctx.getChannel(); final Channel channel = ctx.getChannel();
if (!channel.isConnected()) { if (!channel.isConnected()) {
discard(ctx); discard(ctx, fireNow);
} }
while (channel.isWritable()) { while (channel.isWritable()) {
@ -220,7 +222,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
this.currentEvent = null; this.currentEvent = null;
currentEvent.getFuture().setFailure(t); currentEvent.getFuture().setFailure(t);
fireExceptionCaught(ctx, t); if (fireNow) {
fireExceptionCaught(ctx, t);
} else {
fireExceptionCaughtLater(ctx, t);
}
closeInput(chunks); closeInput(chunks);
break; break;
@ -262,7 +268,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns
} }
if (!channel.isConnected()) { if (!channel.isConnected()) {
discard(ctx); discard(ctx, fireNow);
break; break;
} }
} }

View File

@ -154,7 +154,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler
} }
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
Channels.fireExceptionCaught(ctx, EXCEPTION); Channels.fireExceptionCaughtLater(ctx, EXCEPTION);
} }
private final class WriteTimeoutTask implements TimerTask { private final class WriteTimeoutTask implements TimerTask {

Binary file not shown.

48
pom.xml
View File

@ -212,12 +212,12 @@
<configuration> <configuration>
<rules> <rules>
<requireJavaVersion> <requireJavaVersion>
<!-- Enforce java 1.6 as minimum for compiling --> <!-- Enforce java 1.7 as minimum for compiling -->
<!-- This is needed because of the Unsafe detection code --> <!-- This is needed because of java.util.zip.Deflater -->
<version>[1.6.0,)</version> <version>[1.7.0,)</version>
</requireJavaVersion> </requireJavaVersion>
<requireMavenVersion> <requireMavenVersion>
<version>[3.0.3,)</version> <version>[3.0.2,)</version>
</requireMavenVersion> </requireMavenVersion>
</rules> </rules>
</configuration> </configuration>
@ -237,6 +237,32 @@
<showWarnings>true</showWarnings> <showWarnings>true</showWarnings>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<!-- ensure that only methods available in java 1.6 can
be used even when compiling with java 1.7+ -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<version>1.7</version>
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature</groupId>
<artifactId>java16</artifactId>
<version>1.0</version>
</signature>
<ignores>
<ignore>sun.misc.Unsafe</ignore>
<ignore>java.util.zip.Deflater</ignore>
</ignores>
</configuration>
<executions>
<execution>
<phase>process-classes</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<artifactId>maven-checkstyle-plugin</artifactId> <artifactId>maven-checkstyle-plugin</artifactId>
<version>2.8</version> <version>2.8</version>
@ -260,10 +286,22 @@
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>netty-build</artifactId> <artifactId>netty-build</artifactId>
<version>6</version> <version>7</version>
</dependency> </dependency>
</dependencies> </dependencies>
</plugin> </plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12</version>
<configuration>
<forkMode>once</forkMode>
<excludes>
<exclude>**/Abstract*</exclude>
<exclude>**/TestUtil*</exclude>
</excludes>
<runOrder>random</runOrder>
</configuration>
</plugin>
</plugins> </plugins>
<!-- Workaround for the 'M2E plugin execution not covered' problem. <!-- Workaround for the 'M2E plugin execution not covered' problem.

View File

@ -35,6 +35,14 @@
<artifactId>netty-codec-http</artifactId> <artifactId>netty-codec-http</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- Servlet API - completely optional -->
<!-- Used for HTTP tunneling transport -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<optional>true</optional>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,248 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import java.io.EOFException;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.net.SocketAddress;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channels;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.local.DefaultLocalClientChannelFactory;
import io.netty.channel.local.LocalAddress;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/**
* An {@link HttpServlet} that proxies an incoming data to the actual server
* and vice versa. Please refer to the
* <a href="package-summary.html#package_description">package summary</a> for
* the detailed usage.
* @apiviz.landmark
*/
public class HttpTunnelingServlet extends HttpServlet {
private static final long serialVersionUID = 4259910275899756070L;
private static final String ENDPOINT = "endpoint";
static final InternalLogger logger = InternalLoggerFactory.getInstance(HttpTunnelingServlet.class);
private volatile SocketAddress remoteAddress;
private volatile ChannelFactory channelFactory;
@Override
public void init() throws ServletException {
ServletConfig config = getServletConfig();
String endpoint = config.getInitParameter(ENDPOINT);
if (endpoint == null) {
throw new ServletException("init-param '" + ENDPOINT + "' must be specified.");
}
try {
remoteAddress = parseEndpoint(endpoint.trim());
} catch (ServletException e) {
throw e;
} catch (Exception e) {
throw new ServletException("Failed to parse an endpoint.", e);
}
try {
channelFactory = createChannelFactory(remoteAddress);
} catch (ServletException e) {
throw e;
} catch (Exception e) {
throw new ServletException("Failed to create a channel factory.", e);
}
// Stuff for testing purpose
//ServerBootstrap b = new ServerBootstrap(new DefaultLocalServerChannelFactory());
//b.getPipeline().addLast("logger", new LoggingHandler(getClass(), InternalLogLevel.INFO, true));
//b.getPipeline().addLast("handler", new EchoHandler());
//b.bind(remoteAddress);
}
protected SocketAddress parseEndpoint(String endpoint) throws Exception {
if (endpoint.startsWith("local:")) {
return new LocalAddress(endpoint.substring(6).trim());
} else {
throw new ServletException(
"Invalid or unknown endpoint: " + endpoint);
}
}
protected ChannelFactory createChannelFactory(SocketAddress remoteAddress) throws Exception {
if (remoteAddress instanceof LocalAddress) {
return new DefaultLocalClientChannelFactory();
} else {
throw new ServletException(
"Unsupported remote address type: " +
remoteAddress.getClass().getName());
}
}
@Override
public void destroy() {
try {
destroyChannelFactory(channelFactory);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to destroy a channel factory.", e);
}
}
}
protected void destroyChannelFactory(ChannelFactory factory) throws Exception {
factory.releaseExternalResources();
}
@Override
protected void service(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
if (!"POST".equalsIgnoreCase(req.getMethod())) {
if (logger.isWarnEnabled()) {
logger.warn("Unallowed method: " + req.getMethod());
}
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
return;
}
final ChannelPipeline pipeline = Channels.pipeline();
final ServletOutputStream out = res.getOutputStream();
final OutboundConnectionHandler handler = new OutboundConnectionHandler(out);
pipeline.addLast("handler", handler);
Channel channel = channelFactory.newChannel(pipeline);
ChannelFuture future = channel.connect(remoteAddress).awaitUninterruptibly();
if (!future.isSuccess()) {
Throwable cause = future.getCause();
if (logger.isWarnEnabled()) {
logger.warn("Endpoint unavailable: " + cause.getMessage(), cause);
}
res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
ChannelFuture lastWriteFuture = null;
try {
res.setStatus(HttpServletResponse.SC_OK);
res.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
res.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
// Initiate chunked encoding by flushing the headers.
out.flush();
PushbackInputStream in =
new PushbackInputStream(req.getInputStream());
while (channel.isConnected()) {
ChannelBuffer buffer;
try {
buffer = read(in);
} catch (EOFException e) {
break;
}
if (buffer == null) {
break;
}
lastWriteFuture = channel.write(buffer);
}
} finally {
if (lastWriteFuture == null) {
channel.close();
} else {
lastWriteFuture.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static ChannelBuffer read(PushbackInputStream in) throws IOException {
byte[] buf;
int readBytes;
int bytesToRead = in.available();
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else if (bytesToRead == 0) {
int b = in.read();
if (b < 0 || in.available() < 0) {
return null;
}
in.unread(b);
bytesToRead = in.available();
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else {
return null;
}
assert readBytes > 0;
ChannelBuffer buffer;
if (readBytes == buf.length) {
buffer = ChannelBuffers.wrappedBuffer(buf);
} else {
// A rare case, but it sometimes happen.
buffer = ChannelBuffers.wrappedBuffer(buf, 0, readBytes);
}
return buffer;
}
private static final class OutboundConnectionHandler extends SimpleChannelUpstreamHandler {
private final ServletOutputStream out;
public OutboundConnectionHandler(ServletOutputStream out) {
this.out = out;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
synchronized (this) {
buffer.readBytes(out, buffer.readableBytes());
out.flush();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Unexpected exception while HTTP tunneling", e.getCause());
}
e.getChannel().close();
}
}
}

View File

@ -52,4 +52,6 @@ public abstract class SctpChannel extends AbstractSelectableChannel {
public abstract <T> MessageInfo receive(ByteBuffer dst, T attachment, NotificationHandler<T> handler) throws IOException; public abstract <T> MessageInfo receive(ByteBuffer dst, T attachment, NotificationHandler<T> handler) throws IOException;
public abstract int send(ByteBuffer src, MessageInfo messageInfo) throws IOException; public abstract int send(ByteBuffer src, MessageInfo messageInfo) throws IOException;
public abstract Set<SctpSocketOption<?>> supportedOptions();
} }

View File

@ -0,0 +1,38 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.sctp;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
public abstract class AbstractSctpChannelSink extends AbstractChannelSink {
@Override
public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) {
Channel ch = pipeline.getChannel();
if (ch instanceof SctpChannelImpl) {
SctpChannelImpl channel = (SctpChannelImpl) ch;
return channel.worker.executeInIoThread(channel, task);
} else {
return super.execute(pipeline, task);
}
}
}

View File

@ -17,7 +17,6 @@ package io.netty.channel.sctp;
import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.ReceiveBufferSizePredictorFactory; import io.netty.channel.ReceiveBufferSizePredictorFactory;
import io.netty.channel.socket.SocketChannelConfig;
/** /**
* A {@link io.netty.channel.sctp.SctpChannelConfig} for a NIO SCTP/IP {@link io.netty.channel.sctp.SctpChannel}. * A {@link io.netty.channel.sctp.SctpChannelConfig} for a NIO SCTP/IP {@link io.netty.channel.sctp.SctpChannel}.

View File

@ -18,9 +18,6 @@ package io.netty.channel.sctp;
import com.sun.nio.sctp.Association; import com.sun.nio.sctp.Association;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioSocketChannelConfig;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;

View File

@ -257,8 +257,6 @@ class SctpChannelImpl extends AbstractChannel implements SctpChannel {
private final class WriteRequestQueue extends AbstractWriteRequestQueue { private final class WriteRequestQueue extends AbstractWriteRequestQueue {
private static final long serialVersionUID = -246694024103520626L;
private final ThreadLocalBoolean notifying = new ThreadLocalBoolean(); private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
WriteRequestQueue() { WriteRequestQueue() {

View File

@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory;
/** /**
*/ */
class SctpClientPipelineSink extends AbstractChannelSink { class SctpClientPipelineSink extends AbstractSctpChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); InternalLoggerFactory.getInstance(SctpClientPipelineSink.class);
@ -262,7 +261,7 @@ class SctpClientPipelineSink extends AbstractChannelSink {
wakenUp.set(false); wakenUp.set(false);
try { try {
int selectedKeyCount = selector.select(500); int selectedKeyCount = selector.select(10);
// 'wakenUp.compareAndSet(false, true)' is always evaluated // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up // before calling 'selector.wakeup()' to reduce the wake-up
@ -302,9 +301,9 @@ class SctpClientPipelineSink extends AbstractChannelSink {
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
} }
// Handle connection timeout every 0.5 seconds approximately. // Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime(); long currentTimeNanos = System.nanoTime();
if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 500 * 1000000L) { if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 10 * 1000000L) {
lastConnectTimeoutCheckTimeNanos = currentTimeNanos; lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
processConnectTimeout(selector.keys(), currentTimeNanos); processConnectTimeout(selector.keys(), currentTimeNanos);
} }

View File

@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.sun.nio.sctp.SctpChannel; import com.sun.nio.sctp.SctpChannel;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -45,7 +44,7 @@ import io.netty.util.internal.DeadLockProofWorker;
/** /**
*/ */
class SctpServerPipelineSink extends AbstractChannelSink { class SctpServerPipelineSink extends AbstractSctpChannelSink {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); InternalLoggerFactory.getInstance(SctpServerPipelineSink.class);

View File

@ -31,6 +31,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -45,6 +46,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.MessageEvent; import io.netty.channel.MessageEvent;
import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.ReceiveBufferSizePredictor;
import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer;
import io.netty.channel.socket.ChannelRunnableWrapper;
import io.netty.channel.socket.Worker;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.DeadLockProofWorker;
@ -53,7 +56,7 @@ import io.netty.util.internal.QueueFactory;
/** /**
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
class SctpWorker implements Runnable { class SctpWorker implements Worker {
private static final InternalLogger logger = private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SctpWorker.class); InternalLoggerFactory.getInstance(SctpWorker.class);
@ -64,13 +67,15 @@ class SctpWorker implements Runnable {
private final Executor executor; private final Executor executor;
private boolean started; private boolean started;
private volatile Thread thread; volatile Thread thread;
volatile Selector selector; volatile Selector selector;
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool();
@ -188,6 +193,7 @@ class SctpWorker implements Runnable {
cancelledKeys = 0; cancelledKeys = 0;
processRegisterTaskQueue(); processRegisterTaskQueue();
processEventQueue();
processWriteTaskQueue(); processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
@ -240,7 +246,35 @@ class SctpWorker implements Runnable {
} }
} }
} }
@Override
public ChannelFuture executeInIoThread(Channel channel, Runnable task) {
if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) {
try {
task.run();
return succeededFuture(channel);
} catch (Throwable t) {
return failedFuture(channel, t);
}
} else {
ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task);
boolean added = eventQueue.offer(channelRunnable);
if (added) {
// wake up the selector to speed things
selector.wakeup();
} else {
channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task));
}
return channelRunnable;
}
}
static boolean isIoThread(SctpChannelImpl channel) {
return Thread.currentThread() == channel.worker.thread;
}
private void processRegisterTaskQueue() throws IOException { private void processRegisterTaskQueue() throws IOException {
for (; ;) { for (; ;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = registerTaskQueue.poll();
@ -264,7 +298,18 @@ class SctpWorker implements Runnable {
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException { private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next(); SelectionKey k = i.next();

View File

@ -19,8 +19,6 @@ import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelHandler; import io.netty.channel.SimpleChannelHandler;
import io.netty.channel.sctp.SctpNotificationEvent; import io.netty.channel.sctp.SctpNotificationEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/** /**
* SCTP Channel Handler (upstream + downstream) with SCTP notification handling * SCTP Channel Handler (upstream + downstream) with SCTP notification handling
@ -28,10 +26,6 @@ import io.netty.logging.InternalLoggerFactory;
public class SimpleSctpChannelHandler extends SimpleChannelHandler { public class SimpleSctpChannelHandler extends SimpleChannelHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SimpleSctpUpstreamHandler.class.getName());
@Override @Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent event) throws Exception { public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent event) throws Exception {
if (!(event instanceof SctpNotificationEvent)) { if (!(event instanceof SctpNotificationEvent)) {

View File

@ -19,17 +19,12 @@ import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelUpstreamHandler; import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.sctp.SctpNotificationEvent; import io.netty.channel.sctp.SctpNotificationEvent;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
/** /**
* SCTP Upstream Channel Handler with SCTP notification handling * SCTP Upstream Channel Handler with SCTP notification handling
*/ */
public class SimpleSctpUpstreamHandler extends SimpleChannelUpstreamHandler { public class SimpleSctpUpstreamHandler extends SimpleChannelUpstreamHandler {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SimpleSctpUpstreamHandler.class.getName());
@Override @Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent event) throws Exception { public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent event) throws Exception {
if (!(event instanceof SctpNotificationEvent)) { if (!(event instanceof SctpNotificationEvent)) {

View File

@ -15,7 +15,6 @@
*/ */
package io.netty.testsuite.transport; package io.netty.testsuite.transport;
import com.sun.nio.sctp.SctpChannel;
import com.sun.nio.sctp.SctpServerChannel; import com.sun.nio.sctp.SctpServerChannel;
import io.netty.bootstrap.ClientBootstrap; import io.netty.bootstrap.ClientBootstrap;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
@ -40,7 +39,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;

View File

@ -23,8 +23,6 @@ import io.netty.channel.*;
import io.netty.channel.sctp.SctpClientSocketChannelFactory; import io.netty.channel.sctp.SctpClientSocketChannelFactory;
import io.netty.channel.sctp.SctpFrame; import io.netty.channel.sctp.SctpFrame;
import io.netty.channel.sctp.SctpServerSocketChannelFactory; import io.netty.channel.sctp.SctpServerSocketChannelFactory;
import io.netty.channel.sctp.codec.SctpFrameDecoder;
import io.netty.channel.sctp.codec.SctpFrameEncoder;
import io.netty.testsuite.util.SctpSocketAddresses; import io.netty.testsuite.util.SctpSocketAddresses;
import io.netty.util.internal.ExecutorUtil; import io.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass; import org.junit.AfterClass;

View File

@ -16,7 +16,6 @@
package io.netty.testsuite.transport.sctp; package io.netty.testsuite.transport.sctp;
import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFactory;
import io.netty.channel.sctp.SctpClientSocketChannelFactory;
import io.netty.channel.sctp.SctpServerSocketChannelFactory; import io.netty.channel.sctp.SctpServerSocketChannelFactory;
import io.netty.testsuite.transport.AbstractSocketServerBootstrapTest; import io.netty.testsuite.transport.AbstractSocketServerBootstrapTest;

View File

@ -43,7 +43,28 @@ public abstract class AbstractChannelSink implements ChannelSink {
if (actualCause == null) { if (actualCause == null) {
actualCause = cause; actualCause = cause;
} }
if (isFireExceptionCaughtLater(event, actualCause)) {
fireExceptionCaughtLater(event.getChannel(), actualCause);
} else {
fireExceptionCaught(event.getChannel(), actualCause);
}
}
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
return false;
}
fireExceptionCaught(event.getChannel(), actualCause); /**
* This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it
* in a better way
*/
@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
try {
task.run();
return Channels.succeededFuture(pipeline.getChannel());
} catch (Throwable t) {
return Channels.failedFuture(pipeline.getChannel(), t);
}
} }
} }

View File

@ -56,6 +56,10 @@ package io.netty.channel;
* <p> * <p>
* You will also find various helper methods in {@link Channels} to be useful * You will also find various helper methods in {@link Channels} to be useful
* to generate and send an artificial or manipulated event. * to generate and send an artificial or manipulated event.
* <p>
* <strong>Caution:</strong>
* <p>
* Use the *Later(..) methods of the {@link Channels} class if you want to send an upstream event from a {@link ChannelDownstreamHandler} otherwise you may run into threading issues.
* *
* <h3>State management</h3> * <h3>State management</h3>
* *

View File

@ -432,7 +432,6 @@ public interface ChannelPipeline {
*/ */
ChannelHandlerContext getContext(Class<? extends ChannelHandler> handlerType); ChannelHandlerContext getContext(Class<? extends ChannelHandler> handlerType);
/** /**
* Sends the specified {@link ChannelEvent} to the first * Sends the specified {@link ChannelEvent} to the first
* {@link ChannelUpstreamHandler} in this pipeline. * {@link ChannelUpstreamHandler} in this pipeline.
@ -451,6 +450,12 @@ public interface ChannelPipeline {
*/ */
void sendDownstream(ChannelEvent e); void sendDownstream(ChannelEvent e);
/**
* Schedules the specified task to be executed in the I/O thread associated
* with this pipeline's {@link Channel}.
*/
ChannelFuture execute(Runnable task);
/** /**
* Returns the {@link Channel} that this pipeline is attached to. * Returns the {@link Channel} that this pipeline is attached to.
* *

View File

@ -37,4 +37,10 @@ public interface ChannelSink {
* one of its {@link ChannelHandler}s process a {@link ChannelEvent}. * one of its {@link ChannelHandler}s process a {@link ChannelEvent}.
*/ */
void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception;
/**
* Execute the given {@link Runnable} later in the io-thread.
* Some implementation may not support this and just execute it directly.
*/
ChannelFuture execute(ChannelPipeline pipeline, Runnable task);
} }

View File

@ -298,6 +298,22 @@ public final class Channels {
ctx.getChannel(), message, remoteAddress)); ctx.getChannel(), message, remoteAddress));
} }
/**
* Sends a {@code "writeComplete"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} in the next io-thread.
*/
public static ChannelFuture fireWriteCompleteLater(final Channel channel, final long amount) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireWriteComplete(channel, amount);
}
});
}
/** /**
* Sends a {@code "writeComplete"} event to the first * Sends a {@code "writeComplete"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -321,6 +337,25 @@ public final class Channels {
public static void fireWriteComplete(ChannelHandlerContext ctx, long amount) { public static void fireWriteComplete(ChannelHandlerContext ctx, long amount) {
ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount)); ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount));
} }
/**
* Sends a {@code "channelInterestChanged"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} once the io-thread runs again.
*/
public static ChannelFuture fireChannelInterestChangedLater(final Channel channel) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireChannelInterestChanged(channel);
}
});
}
/** /**
* Sends a {@code "channelInterestChanged"} event to the first * Sends a {@code "channelInterestChanged"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -346,6 +381,21 @@ public final class Channels {
ctx.getChannel(), ChannelState.INTEREST_OPS, Channel.OP_READ)); ctx.getChannel(), ChannelState.INTEREST_OPS, Channel.OP_READ));
} }
/**
* Sends a {@code "channelDisconnected"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} once the io-thread runs again.
*/
public static ChannelFuture fireChannelDisconnectedLater(final Channel channel) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireChannelDisconnected(channel);
}
});
}
/** /**
* Sends a {@code "channelDisconnected"} event to the first * Sends a {@code "channelDisconnected"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -368,6 +418,23 @@ public final class Channels {
ctx.getChannel(), ChannelState.CONNECTED, null)); ctx.getChannel(), ChannelState.CONNECTED, null));
} }
/**
* Sends a {@code "channelUnbound"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} once the io-thread runs again.
*/
public static ChannelFuture fireChannelUnboundLater(final Channel channel) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireChannelUnbound(channel);
}
});
}
/** /**
* Sends a {@code "channelUnbound"} event to the first * Sends a {@code "channelUnbound"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -390,6 +457,24 @@ public final class Channels {
ctx.getChannel(), ChannelState.BOUND, null)); ctx.getChannel(), ChannelState.BOUND, null));
} }
/**
* Sends a {@code "channelClosed"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} once the io-thread runs again.
*/
public static ChannelFuture fireChannelClosedLater(final Channel channel) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireChannelClosed(channel);
}
});
}
/** /**
* Sends a {@code "channelClosed"} event to the first * Sends a {@code "channelClosed"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -418,6 +503,39 @@ public final class Channels {
ctx.getChannel(), ChannelState.OPEN, Boolean.FALSE)); ctx.getChannel(), ChannelState.OPEN, Boolean.FALSE));
} }
/**
* Sends a {@code "exceptionCaught"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel} once the io-thread runs again.
*/
public static ChannelFuture fireExceptionCaughtLater(final Channel channel, final Throwable cause) {
return channel.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireExceptionCaught(channel, cause);
}
});
}
/**
* Sends a {@code "exceptionCaught"} event to the
* {@link ChannelUpstreamHandler} which is placed in the closest upstream
* from the handler associated with the specified
* {@link ChannelHandlerContext} once the io-thread runs again.
*/
public static ChannelFuture fireExceptionCaughtLater(final ChannelHandlerContext ctx, final Throwable cause) {
return ctx.getPipeline().execute(new Runnable() {
@Override
public void run() {
fireExceptionCaught(ctx, cause);
}
});
}
/** /**
* Sends a {@code "exceptionCaught"} event to the first * Sends a {@code "exceptionCaught"} event to the first
* {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of
@ -444,6 +562,7 @@ public final class Channels {
new DefaultChildChannelStateEvent(channel, childChannel)); new DefaultChildChannelStateEvent(channel, childChannel));
} }
/** /**
* Sends a {@code "bind"} request to the last * Sends a {@code "bind"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of

View File

@ -21,6 +21,7 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.RejectedExecutionException;
import io.netty.logging.InternalLogger; import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory; import io.netty.logging.InternalLoggerFactory;
@ -649,6 +650,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return realCtx; return realCtx;
} }
@Override
public ChannelFuture execute(Runnable task) {
return getSink().execute(this, task);
}
protected void notifyHandlerException(ChannelEvent e, Throwable t) { protected void notifyHandlerException(ChannelEvent e, Throwable t) {
if (e instanceof ExceptionEvent) { if (e instanceof ExceptionEvent) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
@ -832,5 +838,14 @@ public class DefaultChannelPipeline implements ChannelPipeline {
ChannelEvent e, ChannelPipelineException cause) throws Exception { ChannelEvent e, ChannelPipelineException cause) throws Exception {
throw cause; throw cause;
} }
@Override
public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
if (logger.isWarnEnabled()) {
logger.warn("Not attached yet; rejecting: " + task);
}
return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
}
} }
} }

Some files were not shown because too many files have changed in this diff Show More