Merge branch 'master' of https://github.com/netty/netty into task-61
This commit is contained in:
commit
18344fbc90
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.jboss.netty.example.discard;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@ -42,7 +41,7 @@ public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
|
||||
private static final Logger logger = Logger.getLogger(
|
||||
DiscardClientHandler.class.getName());
|
||||
|
||||
private final AtomicLong transferredBytes = new AtomicLong();
|
||||
private long transferredBytes = 0;
|
||||
private final byte[] content;
|
||||
|
||||
public DiscardClientHandler(int messageSize) {
|
||||
@ -54,7 +53,7 @@ public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
|
||||
}
|
||||
|
||||
public long getTransferredBytes() {
|
||||
return transferredBytes.get();
|
||||
return transferredBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -88,7 +87,7 @@ public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
@Override
|
||||
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
|
||||
transferredBytes.addAndGet(e.getWrittenAmount());
|
||||
transferredBytes =+e.getWrittenAmount();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.jboss.netty.example.discard;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
@ -38,10 +37,10 @@ public class DiscardServerHandler extends SimpleChannelUpstreamHandler {
|
||||
private static final Logger logger = Logger.getLogger(
|
||||
DiscardServerHandler.class.getName());
|
||||
|
||||
private final AtomicLong transferredBytes = new AtomicLong();
|
||||
private long transferredBytes = 0;
|
||||
|
||||
public long getTransferredBytes() {
|
||||
return transferredBytes.get();
|
||||
return transferredBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -57,7 +56,7 @@ public class DiscardServerHandler extends SimpleChannelUpstreamHandler {
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
// Discard received data silently by doing nothing.
|
||||
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
|
||||
transferredBytes += (((ChannelBuffer) e.getMessage()).readableBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -16,13 +16,14 @@
|
||||
package org.jboss.netty.handler.execution;
|
||||
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
@ -30,6 +31,7 @@ import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
|
||||
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
|
||||
/**
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
|
||||
@ -282,51 +284,53 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
|
||||
}
|
||||
|
||||
private final class ChildExecutor implements Executor, Runnable {
|
||||
private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
|
||||
|
||||
private final Queue<Runnable> tasks = new LinkedTransferQueue<Runnable>();
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
|
||||
ChildExecutor() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
boolean needsExecution;
|
||||
synchronized (tasks) {
|
||||
needsExecution = tasks.isEmpty();
|
||||
tasks.add(command);
|
||||
}
|
||||
// TODO: What todo if the add return false ?
|
||||
tasks.add(command);
|
||||
|
||||
|
||||
if (needsExecution) {
|
||||
if (isRunning.get() == false) {
|
||||
doUnorderedExecute(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Thread thread = Thread.currentThread();
|
||||
for (;;) {
|
||||
final Runnable task;
|
||||
synchronized (tasks) {
|
||||
task = tasks.getFirst();
|
||||
}
|
||||
|
||||
boolean ran = false;
|
||||
beforeExecute(thread, task);
|
||||
// check if its already running by using CAS. If so just return here. So in the worst case the thread
|
||||
// is executed and do nothing
|
||||
if (isRunning.compareAndSet(false, true)) {
|
||||
try {
|
||||
task.run();
|
||||
ran = true;
|
||||
onAfterExecute(task, null);
|
||||
} catch (RuntimeException e) {
|
||||
if (!ran) {
|
||||
onAfterExecute(task, e);
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
synchronized (tasks) {
|
||||
tasks.removeFirst();
|
||||
if (tasks.isEmpty()) {
|
||||
Thread thread = Thread.currentThread();
|
||||
for (;;) {
|
||||
final Runnable task = tasks.poll();
|
||||
// if the task is null we should exit the loop
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
boolean ran = false;
|
||||
beforeExecute(thread, task);
|
||||
try {
|
||||
task.run();
|
||||
ran = true;
|
||||
onAfterExecute(task, null);
|
||||
} catch (RuntimeException e) {
|
||||
if (!ran) {
|
||||
onAfterExecute(task, e);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// set it back to not running
|
||||
isRunning.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2009 Red Hat, Inc.
|
||||
* Copyright 2011 Red Hat, Inc.
|
||||
*
|
||||
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with the
|
||||
@ -26,46 +26,23 @@ import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.FileRegion;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
|
||||
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
|
||||
import org.jboss.netty.handler.ssl.SslHandler;
|
||||
|
||||
/**
|
||||
* {@link ChannelDownstreamHandler} implementation which encodes a {@link FileRegion} to {@link ChannelBuffer}'s if one of the given {@link ChannelHandler} was found in the {@link ChannelPipeline}.
|
||||
*
|
||||
* This {@link ChannelDownstreamHandler} should be used if you plan to write {@link FileRegion} objects and also have some {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} which needs to transform
|
||||
* the to be written {@link ChannelBuffer} in any case. This is for example the case with {@link SslHandler} and {@link ZlibDecoder}.
|
||||
* the to be written {@link ChannelBuffer} in any case. This could be for example {@link ChannelDownstreamHandler}'s which needs to encrypt or compress messages.
|
||||
*
|
||||
* Users of this {@link FileRegionEncoder} should add / remove this {@link ChannelDownstreamHandler} on the fly to get the best performance out of their system.
|
||||
*
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
* @author <a href="http://www.murkycloud.com/">Norman Maurer</a>
|
||||
*
|
||||
*/
|
||||
@ChannelHandler.Sharable
|
||||
public class FileRegionEncoder implements ChannelDownstreamHandler{
|
||||
|
||||
private final Class<? extends ChannelHandler>[] handlers;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new {@link FileRegionEncoder} which checks if one of the given {@link ChannelHandler}'s is contained in the {@link ChannelPipeline} and if so convert the {@link FileRegion} to {@link ChannelBuffer}'s.
|
||||
*
|
||||
* If the given <code>array</code> is empty it will encode the {@link FileRegion} to {@link ChannelBuffer}'s in all cases.
|
||||
*/
|
||||
public FileRegionEncoder(Class<? extends ChannelHandler>... handlers) {
|
||||
if (handlers == null) {
|
||||
throw new NullPointerException("handlers");
|
||||
}
|
||||
this.handlers = handlers;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new {@link FileRegionEncoder} which checks for the present of {@link SslHandler} and {@link ZlibEncoder} once a {@link FileRegion} was written. If once of the two handlers is found it will encode the {@link FileRegion} to {@link ChannelBuffer}'s
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public FileRegionEncoder() {
|
||||
this(SslHandler.class, ZlibEncoder.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDownstream(
|
||||
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
|
||||
@ -77,46 +54,23 @@ public class FileRegionEncoder implements ChannelDownstreamHandler{
|
||||
MessageEvent e = (MessageEvent) evt;
|
||||
Object originalMessage = e.getMessage();
|
||||
if (originalMessage instanceof FileRegion) {
|
||||
|
||||
if (isConvertNeeded(ctx, e)) {
|
||||
FileRegion fr = (FileRegion) originalMessage;
|
||||
WritableByteChannel bchannel = new ChannelWritableByteChannel(ctx, e);
|
||||
|
||||
int length = 0;
|
||||
long i = 0;
|
||||
while ((i = fr.transferTo(bchannel, length)) > 0) {
|
||||
length += i;
|
||||
if (length >= fr.getCount()) {
|
||||
break;
|
||||
}
|
||||
|
||||
FileRegion fr = (FileRegion) originalMessage;
|
||||
WritableByteChannel bchannel = new ChannelWritableByteChannel(ctx, e);
|
||||
|
||||
int length = 0;
|
||||
long i = 0;
|
||||
while ((i = fr.transferTo(bchannel, length)) > 0) {
|
||||
length += i;
|
||||
if (length >= fr.getCount()) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// no converting is needed so just sent the event downstream
|
||||
ctx.sendDownstream(evt);
|
||||
}
|
||||
|
||||
} else {
|
||||
// no converting is needed so just sent the event downstream
|
||||
ctx.sendDownstream(evt);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> if the {@link FileRegion} does need to get converted to {@link ChannelBuffer}'s
|
||||
*
|
||||
*/
|
||||
private boolean isConvertNeeded(ChannelHandlerContext ctx, MessageEvent evt) throws Exception{
|
||||
if (handlers.length == 0) {
|
||||
return true;
|
||||
} else {
|
||||
for (int i = 0; i < handlers.length; i++) {
|
||||
if(ctx.getPipeline().get(handlers[i]) != null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user