Revert "Store reference to IovArray in the EpollEventLoop to reduce thread local access."

This reverts commit ba19837f2f as this is not safe with a ForkJoinPool based EventLoop.
This commit is contained in:
Norman Maurer 2015-11-20 10:07:10 -08:00
parent cd859a7a4d
commit efeb26aa97
4 changed files with 63 additions and 15 deletions

View File

@ -464,9 +464,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
in.forEachFlushedMessage(array);
IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.

View File

@ -382,8 +382,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
writtenBytes = fd().sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
remoteAddress.getAddress(), remoteAddress.getPort());
} else if (data instanceof CompositeByteBuf) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
array.add(data);
IovArray array = IovArrayThreadLocal.get((CompositeByteBuf) data);
int cnt = array.count();
assert cnt != 0;

View File

@ -55,7 +55,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
private final boolean allowGrowing;
private final EpollEventArray events;
private final IovArray iovArray = new IovArray();
private volatile int wakenUp;
private volatile int ioRatio = 50;
@ -101,14 +100,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
}
}
/**
* Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
*/
IovArray cleanArray() {
iovArray.clear();
return iovArray;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
@ -385,7 +376,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
}
} finally {
// release native memory
iovArray.release();
events.free();
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.util.concurrent.FastThreadLocal;
/**
* Allow to obtain {@link IovArray} instances.
*/
final class IovArrayThreadLocal {
private static final FastThreadLocal<IovArray> ARRAY = new FastThreadLocal<IovArray>() {
@Override
protected IovArray initialValue() throws Exception {
return new IovArray();
}
@Override
protected void onRemoval(IovArray value) throws Exception {
// free the direct memory now
value.release();
}
};
/**
* Returns a {@link IovArray} which is filled with the flushed messages of {@link ChannelOutboundBuffer}.
*/
static IovArray get(ChannelOutboundBuffer buffer) throws Exception {
IovArray array = ARRAY.get();
array.clear();
buffer.forEachFlushedMessage(array);
return array;
}
/**
* Returns a {@link IovArray} which is filled with the {@link CompositeByteBuf}.
*/
static IovArray get(CompositeByteBuf buf) throws Exception {
IovArray array = ARRAY.get();
array.clear();
array.add(buf);
return array;
}
private IovArrayThreadLocal() { }
}