diff --git a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java index 856a02b65c..2c8ca87ecb 100644 --- a/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/spdy/SpdyFrameEncoder.java @@ -23,15 +23,17 @@ import java.util.Set; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; /** * Encodes a SPDY Data or Control Frame into a {@link ChannelBuffer}. */ -public class SpdyFrameEncoder extends OneToOneEncoder { +public class SpdyFrameEncoder implements ChannelDownstreamHandler { private final int version; private volatile boolean finished; @@ -69,7 +71,6 @@ public class SpdyFrameEncoder extends OneToOneEncoder { * Creates a new instance with the specified parameters. */ public SpdyFrameEncoder(int version, int compressionLevel, int windowBits, int memLevel) { - super(); if (version < SPDY_MIN_VERSION || version > SPDY_MAX_VERSION) { throw new IllegalArgumentException( "unknown version: " + version); @@ -79,9 +80,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder { version, compressionLevel, windowBits, memLevel); } - @Override public void handleDownstream( - ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { + final ChannelHandlerContext ctx, ChannelEvent evt) throws Exception { if (evt instanceof ChannelStateEvent) { ChannelStateEvent e = (ChannelStateEvent) evt; switch (e.getState()) { @@ -96,13 +96,14 @@ public class SpdyFrameEncoder extends OneToOneEncoder { } } } - super.handleDownstream(ctx, evt); - } - @Override - protected Object encode( - ChannelHandlerContext ctx, Channel channel, Object msg) - throws Exception { + if (!(evt instanceof MessageEvent)) { + ctx.sendDownstream(evt); + return; + } + + final MessageEvent e = (MessageEvent) evt; + Object msg = e.getMessage(); if (msg instanceof SpdyDataFrame) { @@ -114,75 +115,95 @@ public class SpdyFrameEncoder extends OneToOneEncoder { header.writeInt(spdyDataFrame.getStreamId() & 0x7FFFFFFF); header.writeByte(flags); header.writeMedium(data.readableBytes()); - return ChannelBuffers.wrappedBuffer(header, data); + ChannelBuffer frame = ChannelBuffers.wrappedBuffer(header, data); + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdySynStreamFrame) { - SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; - ChannelBuffer data = compressHeaderBlock( - encodeHeaderBlock(version, spdySynStreamFrame)); - byte flags = spdySynStreamFrame.isLast() ? SPDY_FLAG_FIN : 0; - if (spdySynStreamFrame.isUnidirectional()) { - flags |= SPDY_FLAG_UNIDIRECTIONAL; - } - int headerBlockLength = data.readableBytes(); - int length; - if (version < 3) { - length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength; - } else { - length = 10 + headerBlockLength; - } - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(version | 0x8000); - frame.writeShort(SPDY_SYN_STREAM_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(spdySynStreamFrame.getStreamId()); - frame.writeInt(spdySynStreamFrame.getAssociatedToStreamId()); - if (version < 3) { - // Restrict priorities for SPDY/2 to between 0 and 3 - byte priority = spdySynStreamFrame.getPriority(); - if (priority > 3) { - priority = 3; + synchronized (headerBlockCompressor) { + SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg; + ChannelBuffer data = compressHeaderBlock( + encodeHeaderBlock(version, spdySynStreamFrame)); + byte flags = spdySynStreamFrame.isLast() ? SPDY_FLAG_FIN : 0; + if (spdySynStreamFrame.isUnidirectional()) { + flags |= SPDY_FLAG_UNIDIRECTIONAL; } - frame.writeShort((priority & 0xFF) << 14); - } else { - frame.writeShort((spdySynStreamFrame.getPriority() & 0xFF) << 13); + int headerBlockLength = data.readableBytes(); + int length; + if (version < 3) { + length = headerBlockLength == 0 ? 12 : 10 + headerBlockLength; + } else { + length = 10 + headerBlockLength; + } + ChannelBuffer frame = ChannelBuffers.buffer( + ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 12); + frame.writeShort(version | 0x8000); + frame.writeShort(SPDY_SYN_STREAM_FRAME); + frame.writeByte(flags); + frame.writeMedium(length); + frame.writeInt(spdySynStreamFrame.getStreamId()); + frame.writeInt(spdySynStreamFrame.getAssociatedToStreamId()); + if (version < 3) { + // Restrict priorities for SPDY/2 to between 0 and 3 + byte priority = spdySynStreamFrame.getPriority(); + if (priority > 3) { + priority = 3; + } + frame.writeShort((priority & 0xFF) << 14); + } else { + frame.writeShort((spdySynStreamFrame.getPriority() & 0xFF) << 13); + } + if (version < 3 && data.readableBytes() == 0) { + frame.writeShort(0); + } + // Writes of compressed data must occur in order + final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); + e.getChannel().getPipeline().execute(new Runnable() { + public void run() { + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); + } + }); } - if (version < 3 && data.readableBytes() == 0) { - frame.writeShort(0); - } - return ChannelBuffers.wrappedBuffer(frame, data); + return; } else if (msg instanceof SpdySynReplyFrame) { - SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg; - ChannelBuffer data = compressHeaderBlock( - encodeHeaderBlock(version, spdySynReplyFrame)); - byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0; - int headerBlockLength = data.readableBytes(); - int length; - if (version < 3) { - length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength; - } else { - length = 4 + headerBlockLength; - } - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(version | 0x8000); - frame.writeShort(SPDY_SYN_REPLY_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(spdySynReplyFrame.getStreamId()); - if (version < 3) { - if (data.readableBytes() == 0) { - frame.writeInt(0); + synchronized (headerBlockCompressor) { + SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg; + ChannelBuffer data = compressHeaderBlock( + encodeHeaderBlock(version, spdySynReplyFrame)); + byte flags = spdySynReplyFrame.isLast() ? SPDY_FLAG_FIN : 0; + int headerBlockLength = data.readableBytes(); + int length; + if (version < 3) { + length = headerBlockLength == 0 ? 8 : 6 + headerBlockLength; } else { - frame.writeShort(0); + length = 4 + headerBlockLength; } + ChannelBuffer frame = ChannelBuffers.buffer( + ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + 8); + frame.writeShort(version | 0x8000); + frame.writeShort(SPDY_SYN_REPLY_FRAME); + frame.writeByte(flags); + frame.writeMedium(length); + frame.writeInt(spdySynReplyFrame.getStreamId()); + if (version < 3) { + if (data.readableBytes() == 0) { + frame.writeInt(0); + } else { + frame.writeShort(0); + } + } + // Writes of compressed data must occur in order + final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); + e.getChannel().getPipeline().execute(new Runnable() { + public void run() { + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); + } + }); } - return ChannelBuffers.wrappedBuffer(frame, data); + return; } else if (msg instanceof SpdyRstStreamFrame) { @@ -194,7 +215,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder { frame.writeInt(8); frame.writeInt(spdyRstStreamFrame.getStreamId()); frame.writeInt(spdyRstStreamFrame.getStatus().getCode()); - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdySettingsFrame) { @@ -234,7 +256,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder { } frame.writeInt(spdySettingsFrame.getValue(id)); } - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdyNoOpFrame) { @@ -243,7 +266,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder { frame.writeShort(version | 0x8000); frame.writeShort(SPDY_NOOP_FRAME); frame.writeInt(0); - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdyPingFrame) { @@ -254,7 +278,8 @@ public class SpdyFrameEncoder extends OneToOneEncoder { frame.writeShort(SPDY_PING_FRAME); frame.writeInt(4); frame.writeInt(spdyPingFrame.getId()); - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdyGoAwayFrame) { @@ -269,32 +294,42 @@ public class SpdyFrameEncoder extends OneToOneEncoder { if (version >= 3) { frame.writeInt(spdyGoAwayFrame.getStatus().getCode()); } - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } else if (msg instanceof SpdyHeadersFrame) { - SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg; - ChannelBuffer data = compressHeaderBlock( - encodeHeaderBlock(version, spdyHeadersFrame)); - byte flags = spdyHeadersFrame.isLast() ? SPDY_FLAG_FIN : 0; - int headerBlockLength = data.readableBytes(); - int length; - if (version < 3) { - length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength; - } else { - length = 4 + headerBlockLength; + synchronized (headerBlockCompressor) { + SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg; + ChannelBuffer data = compressHeaderBlock( + encodeHeaderBlock(version, spdyHeadersFrame)); + byte flags = spdyHeadersFrame.isLast() ? SPDY_FLAG_FIN : 0; + int headerBlockLength = data.readableBytes(); + int length; + if (version < 3) { + length = headerBlockLength == 0 ? 4 : 6 + headerBlockLength; + } else { + length = 4 + headerBlockLength; + } + ChannelBuffer frame = ChannelBuffers.buffer( + ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); + frame.writeShort(version | 0x8000); + frame.writeShort(SPDY_HEADERS_FRAME); + frame.writeByte(flags); + frame.writeMedium(length); + frame.writeInt(spdyHeadersFrame.getStreamId()); + if (version < 3 && data.readableBytes() != 0) { + frame.writeShort(0); + } + // Writes of compressed data must occur in order + final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(frame, data); + e.getChannel().getPipeline().execute(new Runnable() { + public void run() { + Channels.write(ctx, e.getFuture(), buffer, e.getRemoteAddress()); + } + }); } - ChannelBuffer frame = ChannelBuffers.buffer( - ByteOrder.BIG_ENDIAN, SPDY_HEADER_SIZE + length); - frame.writeShort(version | 0x8000); - frame.writeShort(SPDY_HEADERS_FRAME); - frame.writeByte(flags); - frame.writeMedium(length); - frame.writeInt(spdyHeadersFrame.getStreamId()); - if (version < 3 && data.readableBytes() != 0) { - frame.writeShort(0); - } - return ChannelBuffers.wrappedBuffer(frame, data); + return; } else if (msg instanceof SpdyWindowUpdateFrame) { @@ -306,11 +341,12 @@ public class SpdyFrameEncoder extends OneToOneEncoder { frame.writeInt(8); frame.writeInt(spdyWindowUpdateFrame.getStreamId()); frame.writeInt(spdyWindowUpdateFrame.getDeltaWindowSize()); - return frame; + Channels.write(ctx, e.getFuture(), frame, e.getRemoteAddress()); + return; } // Unknown message type - return msg; + ctx.sendDownstream(evt); } private static void writeLengthField(int version, ChannelBuffer buffer, int length) { @@ -367,17 +403,16 @@ public class SpdyFrameEncoder extends OneToOneEncoder { return headerBlock; } - private synchronized ChannelBuffer compressHeaderBlock( - ChannelBuffer uncompressed) throws Exception { + // always called while synchronized on headerBlockCompressor + private ChannelBuffer compressHeaderBlock(ChannelBuffer uncompressed) + throws Exception { if (uncompressed.readableBytes() == 0) { return ChannelBuffers.EMPTY_BUFFER; } ChannelBuffer compressed = ChannelBuffers.dynamicBuffer(); - synchronized (headerBlockCompressor) { - if (!finished) { - headerBlockCompressor.setInput(uncompressed); - headerBlockCompressor.encode(compressed); - } + if (!finished) { + headerBlockCompressor.setInput(uncompressed); + headerBlockCompressor.encode(compressed); } return compressed; }