CompatibleObjectEncoder cached ObjectOutputStream backed by release buffer bug
Motivation: ObjectOutputStream uses a Channel Attribute to cache a ObjectOutputStream which is backed by a ByteBuf that may be released after an object is encoded and the underlying buffer is written to the channel. On subsequent encode operations the cached ObjectOutputStream will be invalid and lead to a reference count exception. Modifications: - CompatibleObjectEncoder should not cache a ObjectOutputStream. Result: CompatibleObjectEncoder doesn't use a cached object backed by a released ByteBuf.
This commit is contained in:
parent
de4692e6e0
commit
7cf56300c5
@ -35,10 +35,6 @@ import java.io.Serializable;
|
||||
* {@link ObjectInputStream} and {@link ObjectOutputStream}.
|
||||
*/
|
||||
public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable> {
|
||||
|
||||
private static final AttributeKey<ObjectOutputStream> OOS =
|
||||
AttributeKey.valueOf(CompatibleObjectEncoder.class.getName() + ".OOS");
|
||||
|
||||
private final int resetInterval;
|
||||
private int writtenObjects;
|
||||
|
||||
@ -77,17 +73,8 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable>
|
||||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
|
||||
Attribute<ObjectOutputStream> oosAttr = ctx.attr(OOS);
|
||||
ObjectOutputStream oos = oosAttr.get();
|
||||
if (oos == null) {
|
||||
oos = newObjectOutputStream(new ByteBufOutputStream(out));
|
||||
ObjectOutputStream newOos = oosAttr.setIfAbsent(oos);
|
||||
if (newOos != null) {
|
||||
oos = newOos;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (oos) {
|
||||
ObjectOutputStream oos = newObjectOutputStream(new ByteBufOutputStream(out));
|
||||
try {
|
||||
if (resetInterval != 0) {
|
||||
// Resetting will prevent OOM on the receiving side.
|
||||
writtenObjects ++;
|
||||
@ -98,6 +85,8 @@ public class CompatibleObjectEncoder extends MessageToByteEncoder<Serializable>
|
||||
|
||||
oos.writeObject(msg);
|
||||
oos.flush();
|
||||
} finally {
|
||||
oos.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.codec.serialization;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufInputStream;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.Serializable;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class CompatibleObjectEncoderTest {
|
||||
@Test
|
||||
public void testMultipleEncodeReferenceCount() throws IOException, ClassNotFoundException {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new CompatibleObjectEncoder());
|
||||
testEncode(channel, new TestSerializable(6, 8));
|
||||
testEncode(channel, new TestSerializable(10, 5));
|
||||
testEncode(channel, new TestSerializable(1, 5));
|
||||
assertFalse(channel.finishAndReleaseAll());
|
||||
}
|
||||
|
||||
private static void testEncode(EmbeddedChannel channel, TestSerializable original)
|
||||
throws IOException, ClassNotFoundException {
|
||||
channel.writeOutbound(original);
|
||||
Object o = channel.readOutbound();
|
||||
ByteBuf buf = (ByteBuf) o;
|
||||
ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buf));
|
||||
try {
|
||||
assertEquals(original, ois.readObject());
|
||||
} finally {
|
||||
buf.release();
|
||||
ois.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class TestSerializable implements Serializable {
|
||||
public final int x;
|
||||
public final int y;
|
||||
|
||||
TestSerializable(int x, int y) {
|
||||
this.x = x;
|
||||
this.y = y;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof TestSerializable)) {
|
||||
return false;
|
||||
}
|
||||
TestSerializable rhs = (TestSerializable) o;
|
||||
return x == rhs.x && y == rhs.y;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 31 * (31 + x) + y;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user