CompatibleObjectEncoder cached ObjectOutputStream backed by release buffer bug

Motivation:
ObjectOutputStream uses a Channel Attribute to cache a ObjectOutputStream which is backed by a ByteBuf that may be released after an object is encoded and the underlying buffer is written to the channel. On subsequent encode operations the cached ObjectOutputStream will be invalid and lead to a reference count exception.

Modifications:
- CompatibleObjectEncoder should not cache a ObjectOutputStream.

Result:
CompatibleObjectEncoder doesn't use a cached object backed by a released ByteBuf.
This commit is contained in:
Scott Mitchell 2016-10-31 10:21:58 -07:00
parent 705e3f629a
commit e47da7be77
2 changed files with 82 additions and 15 deletions

View File

@ -35,10 +35,6 @@ import java.io.Serializable;
* {@link ObjectInputStream} and {@link ObjectOutputStream}. * {@link ObjectInputStream} and {@link ObjectOutputStream}.
*/ */
public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable> { public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final AttributeKey<ObjectOutputStream> OOS =
AttributeKey.valueOf(CompatibleObjectEncoder.class, "OOS");
private final int resetInterval; private final int resetInterval;
private int writtenObjects; private int writtenObjects;
@ -77,17 +73,8 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable>
@Override @Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS); ObjectOutputStream oos = newObjectOutputStream(new ByteBufOutputStream(out));
ObjectOutputStream oos = oosAttr.get(); try {
if (oos == null) {
oos = newObjectOutputStream(new ByteBufOutputStream(out));
ObjectOutputStream newOos = oosAttr.setIfAbsent(oos);
if (newOos != null) {
oos = newOos;
}
}
synchronized (oos) {
if (resetInterval != 0) { if (resetInterval != 0) {
// Resetting will prevent OOM on the receiving side. // Resetting will prevent OOM on the receiving side.
writtenObjects ++; writtenObjects ++;
@ -98,6 +85,8 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable>
oos.writeObject(msg); oos.writeObject(msg);
oos.flush(); oos.flush();
} finally {
oos.close();
} }
} }
} }

View File

@ -0,0 +1,78 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.serialization;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CompatibleObjectEncoderTest {
@Test
public void testMultipleEncodeReferenceCount() throws IOException, ClassNotFoundException {
EmbeddedChannel channel = new EmbeddedChannel(new CompatibleObjectEncoder());
testEncode(channel, new TestSerializable(6, 8));
testEncode(channel, new TestSerializable(10, 5));
testEncode(channel, new TestSerializable(1, 5));
assertFalse(channel.finishAndReleaseAll());
}
private static void testEncode(EmbeddedChannel channel, TestSerializable original)
throws IOException, ClassNotFoundException {
channel.writeOutbound(original);
Object o = channel.readOutbound();
ByteBuf buf = (ByteBuf) o;
ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buf));
try {
assertEquals(original, ois.readObject());
} finally {
buf.release();
ois.close();
}
}
private static final class TestSerializable implements Serializable {
public final int x;
public final int y;
TestSerializable(int x, int y) {
this.x = x;
this.y = y;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TestSerializable)) {
return false;
}
TestSerializable rhs = (TestSerializable) o;
return x == rhs.x && y == rhs.y;
}
@Override
public int hashCode() {
return 31 * (31 + x) + y;
}
}
}