From f3f40b24254ab702cc4c18e5b4baf6bf7bf3f03b Mon Sep 17 00:00:00 2001 From: izstas Date: Thu, 19 Jul 2012 16:09:48 +0400 Subject: [PATCH 1/5] Added TRACE level to Internal Logger --- .../netty/logging/AbstractInternalLogger.java | 8 ++++ .../java/io/netty/logging/CommonsLogger.java | 15 +++++++ .../io/netty/logging/InternalLogLevel.java | 4 ++ .../java/io/netty/logging/InternalLogger.java | 15 +++++++ .../netty/logging/InternalLoggerFactory.java | 15 +++++++ .../java/io/netty/logging/JBossLogger.java | 15 +++++++ .../main/java/io/netty/logging/JdkLogger.java | 15 +++++++ .../java/io/netty/logging/Log4JLogger.java | 15 +++++++ .../java/io/netty/logging/OsgiLogger.java | 15 +++++++ .../java/io/netty/logging/Slf4JLogger.java | 15 +++++++ .../io/netty/logging/CommonsLoggerTest.java | 39 ++++++++++++++++++ .../logging/InternalLoggerFactoryTest.java | 30 ++++++++++++++ .../io/netty/logging/JBossLoggerTest.java | 39 ++++++++++++++++++ .../java/io/netty/logging/JdkLoggerTest.java | 40 +++++++++++++++++++ .../io/netty/logging/Log4JLoggerTest.java | 40 +++++++++++++++++++ .../io/netty/logging/Slf4JLoggerTest.java | 39 ++++++++++++++++++ 16 files changed, 359 insertions(+) diff --git a/common/src/main/java/io/netty/logging/AbstractInternalLogger.java b/common/src/main/java/io/netty/logging/AbstractInternalLogger.java index c35e2ef6a3..e88a2e3879 100644 --- a/common/src/main/java/io/netty/logging/AbstractInternalLogger.java +++ b/common/src/main/java/io/netty/logging/AbstractInternalLogger.java @@ -31,6 +31,8 @@ public abstract class AbstractInternalLogger implements InternalLogger { @Override public boolean isEnabled(InternalLogLevel level) { switch (level) { + case TRACE: + return isTraceEnabled(); case DEBUG: return isDebugEnabled(); case INFO: @@ -47,6 +49,9 @@ public abstract class AbstractInternalLogger implements InternalLogger { @Override public void log(InternalLogLevel level, String msg, Throwable cause) { switch (level) { + case TRACE: + trace(msg, cause); + break; case DEBUG: debug(msg, cause); break; @@ -67,6 +72,9 @@ public abstract class AbstractInternalLogger implements InternalLogger { @Override public void log(InternalLogLevel level, String msg) { switch (level) { + case TRACE: + trace(msg); + break; case DEBUG: debug(msg); break; diff --git a/common/src/main/java/io/netty/logging/CommonsLogger.java b/common/src/main/java/io/netty/logging/CommonsLogger.java index 0892f0a3cf..9a3378d31c 100644 --- a/common/src/main/java/io/netty/logging/CommonsLogger.java +++ b/common/src/main/java/io/netty/logging/CommonsLogger.java @@ -31,6 +31,16 @@ class CommonsLogger extends AbstractInternalLogger { this.loggerName = loggerName; } + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.trace(msg, cause); + } + @Override public void debug(String msg) { logger.debug(msg); @@ -61,6 +71,11 @@ class CommonsLogger extends AbstractInternalLogger { logger.info(msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + @Override public boolean isDebugEnabled() { return logger.isDebugEnabled(); diff --git a/common/src/main/java/io/netty/logging/InternalLogLevel.java b/common/src/main/java/io/netty/logging/InternalLogLevel.java index 0ec3ccd0f2..3a3e1291c2 100644 --- a/common/src/main/java/io/netty/logging/InternalLogLevel.java +++ b/common/src/main/java/io/netty/logging/InternalLogLevel.java @@ -19,6 +19,10 @@ package io.netty.logging; * The log level that {@link InternalLogger} can log at. */ public enum InternalLogLevel { + /** + * 'TRACE' log level. + */ + TRACE, /** * 'DEBUG' log level. */ diff --git a/common/src/main/java/io/netty/logging/InternalLogger.java b/common/src/main/java/io/netty/logging/InternalLogger.java index 65254fd788..1b1496ff8e 100644 --- a/common/src/main/java/io/netty/logging/InternalLogger.java +++ b/common/src/main/java/io/netty/logging/InternalLogger.java @@ -20,6 +20,11 @@ package io.netty.logging; * access this class outside of Netty. */ public interface InternalLogger { + /** + * Returns {@code true} if a TRACE level message is logged. + */ + boolean isTraceEnabled(); + /** * Returns {@code true} if a DEBUG level message is logged. */ @@ -45,6 +50,16 @@ public interface InternalLogger { */ boolean isEnabled(InternalLogLevel level); + /** + * Logs a TRACE level message. + */ + void trace(String msg); + + /** + * Logs a TRACE level message. + */ + void trace(String msg, Throwable cause); + /** * Logs a DEBUG level message. */ diff --git a/common/src/main/java/io/netty/logging/InternalLoggerFactory.java b/common/src/main/java/io/netty/logging/InternalLoggerFactory.java index 03815510dc..90cd8b4f02 100644 --- a/common/src/main/java/io/netty/logging/InternalLoggerFactory.java +++ b/common/src/main/java/io/netty/logging/InternalLoggerFactory.java @@ -66,6 +66,16 @@ public abstract class InternalLoggerFactory { final InternalLogger logger = getDefaultFactory().newInstance(name); return new InternalLogger() { + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.trace(msg, cause); + } + @Override public void debug(String msg) { logger.debug(msg); @@ -96,6 +106,11 @@ public abstract class InternalLoggerFactory { logger.info(msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + @Override public boolean isDebugEnabled() { return logger.isDebugEnabled(); diff --git a/common/src/main/java/io/netty/logging/JBossLogger.java b/common/src/main/java/io/netty/logging/JBossLogger.java index b0668df24a..f134ff19aa 100644 --- a/common/src/main/java/io/netty/logging/JBossLogger.java +++ b/common/src/main/java/io/netty/logging/JBossLogger.java @@ -29,6 +29,16 @@ class JBossLogger extends AbstractInternalLogger { this.logger = logger; } + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.trace(msg, cause); + } + @Override public void debug(String msg) { logger.debug(msg); @@ -59,6 +69,11 @@ class JBossLogger extends AbstractInternalLogger { logger.info(msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + @Override @SuppressWarnings("deprecation") public boolean isDebugEnabled() { diff --git a/common/src/main/java/io/netty/logging/JdkLogger.java b/common/src/main/java/io/netty/logging/JdkLogger.java index d5a6895ac9..274a0accaa 100644 --- a/common/src/main/java/io/netty/logging/JdkLogger.java +++ b/common/src/main/java/io/netty/logging/JdkLogger.java @@ -32,6 +32,16 @@ class JdkLogger extends AbstractInternalLogger { this.loggerName = loggerName; } + @Override + public void trace(String msg) { + logger.logp(Level.FINEST, loggerName, null, msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.logp(Level.FINEST, loggerName, null, msg, cause); + } + @Override public void debug(String msg) { logger.logp(Level.FINE, loggerName, null, msg); @@ -62,6 +72,11 @@ class JdkLogger extends AbstractInternalLogger { logger.logp(Level.INFO, loggerName, null, msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isLoggable(Level.FINEST); + } + @Override public boolean isDebugEnabled() { return logger.isLoggable(Level.FINE); diff --git a/common/src/main/java/io/netty/logging/Log4JLogger.java b/common/src/main/java/io/netty/logging/Log4JLogger.java index ed14865af7..5e379d2932 100644 --- a/common/src/main/java/io/netty/logging/Log4JLogger.java +++ b/common/src/main/java/io/netty/logging/Log4JLogger.java @@ -29,6 +29,16 @@ class Log4JLogger extends AbstractInternalLogger { this.logger = logger; } + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.trace(msg, cause); + } + @Override public void debug(String msg) { logger.debug(msg); @@ -59,6 +69,11 @@ class Log4JLogger extends AbstractInternalLogger { logger.info(msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + @Override public boolean isDebugEnabled() { return logger.isDebugEnabled(); diff --git a/common/src/main/java/io/netty/logging/OsgiLogger.java b/common/src/main/java/io/netty/logging/OsgiLogger.java index 696c9afd72..ddf1c2b282 100644 --- a/common/src/main/java/io/netty/logging/OsgiLogger.java +++ b/common/src/main/java/io/netty/logging/OsgiLogger.java @@ -34,6 +34,16 @@ class OsgiLogger extends AbstractInternalLogger { prefix = "[" + name + "] "; } + @Override + public void trace(String msg) { + // This logger doesn't have TRACE level + } + + @Override + public void trace(String msg, Throwable cause) { + // This logger doesn't have TRACE level + } + @Override public void debug(String msg) { LogService logService = parent.getLogService(); @@ -94,6 +104,11 @@ class OsgiLogger extends AbstractInternalLogger { } } + @Override + public boolean isTraceEnabled() { + return false; + } + @Override public boolean isDebugEnabled() { return true; diff --git a/common/src/main/java/io/netty/logging/Slf4JLogger.java b/common/src/main/java/io/netty/logging/Slf4JLogger.java index cc17e30ad5..259757d35c 100644 --- a/common/src/main/java/io/netty/logging/Slf4JLogger.java +++ b/common/src/main/java/io/netty/logging/Slf4JLogger.java @@ -28,6 +28,16 @@ class Slf4JLogger extends AbstractInternalLogger { this.logger = logger; } + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Throwable cause) { + logger.trace(msg, cause); + } + @Override public void debug(String msg) { logger.debug(msg); @@ -58,6 +68,11 @@ class Slf4JLogger extends AbstractInternalLogger { logger.info(msg, cause); } + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + @Override public boolean isDebugEnabled() { return logger.isDebugEnabled(); diff --git a/common/src/test/java/io/netty/logging/CommonsLoggerTest.java b/common/src/test/java/io/netty/logging/CommonsLoggerTest.java index e54480c568..c452f7de0b 100644 --- a/common/src/test/java/io/netty/logging/CommonsLoggerTest.java +++ b/common/src/test/java/io/netty/logging/CommonsLoggerTest.java @@ -23,6 +23,19 @@ import org.junit.Test; public class CommonsLoggerTest { private static final Exception e = new Exception(); + @Test + public void testIsTraceEnabled() { + org.apache.commons.logging.Log mock = + createStrictMock(org.apache.commons.logging.Log.class); + + expect(mock.isTraceEnabled()).andReturn(true); + replay(mock); + + InternalLogger logger = new CommonsLogger(mock, "foo"); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test public void testIsDebugEnabled() { org.apache.commons.logging.Log mock = @@ -75,6 +88,32 @@ public class CommonsLoggerTest { verify(mock); } + @Test + public void testTrace() { + org.apache.commons.logging.Log mock = + createStrictMock(org.apache.commons.logging.Log.class); + + mock.trace("a"); + replay(mock); + + InternalLogger logger = new CommonsLogger(mock, "foo"); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + org.apache.commons.logging.Log mock = + createStrictMock(org.apache.commons.logging.Log.class); + + mock.trace("a", e); + replay(mock); + + InternalLogger logger = new CommonsLogger(mock, "foo"); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { org.apache.commons.logging.Log mock = diff --git a/common/src/test/java/io/netty/logging/InternalLoggerFactoryTest.java b/common/src/test/java/io/netty/logging/InternalLoggerFactoryTest.java index cdc160d65e..0217e8a5a6 100644 --- a/common/src/test/java/io/netty/logging/InternalLoggerFactoryTest.java +++ b/common/src/test/java/io/netty/logging/InternalLoggerFactoryTest.java @@ -54,6 +54,16 @@ public class InternalLoggerFactoryTest { assertNotSame(mock, InternalLoggerFactory.getInstance("mock")); } + @Test + public void testIsTraceEnabled() { + expect(mock.isTraceEnabled()).andReturn(true); + replay(mock); + + InternalLogger logger = InternalLoggerFactory.getInstance("mock"); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test public void testIsDebugEnabled() { expect(mock.isDebugEnabled()).andReturn(true); @@ -94,6 +104,26 @@ public class InternalLoggerFactoryTest { verify(mock); } + @Test + public void testTrace() { + mock.trace("a"); + replay(mock); + + InternalLogger logger = InternalLoggerFactory.getInstance("mock"); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + mock.trace("a", e); + replay(mock); + + InternalLogger logger = InternalLoggerFactory.getInstance("mock"); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { mock.debug("a"); diff --git a/common/src/test/java/io/netty/logging/JBossLoggerTest.java b/common/src/test/java/io/netty/logging/JBossLoggerTest.java index a6a96a0031..4ac069b2b9 100644 --- a/common/src/test/java/io/netty/logging/JBossLoggerTest.java +++ b/common/src/test/java/io/netty/logging/JBossLoggerTest.java @@ -23,6 +23,19 @@ import org.junit.Test; public class JBossLoggerTest { private static final Exception e = new Exception(); + @Test + public void testIsTraceEnabled() { + org.jboss.logging.Logger mock = + createStrictMock(org.jboss.logging.Logger.class); + + expect(mock.isTraceEnabled()).andReturn(true); + replay(mock); + + InternalLogger logger = new JBossLogger(mock); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test @SuppressWarnings("deprecation") public void testIsDebugEnabled() { @@ -73,6 +86,32 @@ public class JBossLoggerTest { verify(mock); } + @Test + public void testTrace() { + org.jboss.logging.Logger mock = + createStrictMock(org.jboss.logging.Logger.class); + + mock.trace("a"); + replay(mock); + + InternalLogger logger = new JBossLogger(mock); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + org.jboss.logging.Logger mock = + createStrictMock(org.jboss.logging.Logger.class); + + mock.trace("a", e); + replay(mock); + + InternalLogger logger = new JBossLogger(mock); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { org.jboss.logging.Logger mock = diff --git a/common/src/test/java/io/netty/logging/JdkLoggerTest.java b/common/src/test/java/io/netty/logging/JdkLoggerTest.java index 7efbc03280..98d356049e 100644 --- a/common/src/test/java/io/netty/logging/JdkLoggerTest.java +++ b/common/src/test/java/io/netty/logging/JdkLoggerTest.java @@ -25,6 +25,20 @@ import org.junit.Test; public class JdkLoggerTest { private static final Exception e = new Exception(); + @Test + public void testIsTraceEnabled() { + + java.util.logging.Logger mock = + createStrictMock(java.util.logging.Logger.class); + + expect(mock.isLoggable(Level.FINEST)).andReturn(true); + replay(mock); + + InternalLogger logger = new JdkLogger(mock, "foo"); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test public void testIsDebugEnabled() { @@ -78,6 +92,32 @@ public class JdkLoggerTest { verify(mock); } + @Test + public void testTrace() { + java.util.logging.Logger mock = + createStrictMock(java.util.logging.Logger.class); + + mock.logp(Level.FINEST, "foo", null, "a"); + replay(mock); + + InternalLogger logger = new JdkLogger(mock, "foo"); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + java.util.logging.Logger mock = + createStrictMock(java.util.logging.Logger.class); + + mock.logp(Level.FINEST, "foo", null, "a", e); + replay(mock); + + InternalLogger logger = new JdkLogger(mock, "foo"); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { java.util.logging.Logger mock = diff --git a/common/src/test/java/io/netty/logging/Log4JLoggerTest.java b/common/src/test/java/io/netty/logging/Log4JLoggerTest.java index 5e8ac70746..5347314bd2 100644 --- a/common/src/test/java/io/netty/logging/Log4JLoggerTest.java +++ b/common/src/test/java/io/netty/logging/Log4JLoggerTest.java @@ -23,6 +23,20 @@ import org.junit.Test; public class Log4JLoggerTest { private static final Exception e = new Exception(); + @Test + public void testIsTraceEnabled() { + + org.apache.log4j.Logger mock = + createStrictMock(org.apache.log4j.Logger.class); + + expect(mock.isTraceEnabled()).andReturn(true); + replay(mock); + + InternalLogger logger = new Log4JLogger(mock); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test public void testIsDebugEnabled() { @@ -73,6 +87,32 @@ public class Log4JLoggerTest { verify(mock); } + @Test + public void testTrace() { + org.apache.log4j.Logger mock = + createStrictMock(org.apache.log4j.Logger.class); + + mock.trace("a"); + replay(mock); + + InternalLogger logger = new Log4JLogger(mock); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + org.apache.log4j.Logger mock = + createStrictMock(org.apache.log4j.Logger.class); + + mock.trace("a", e); + replay(mock); + + InternalLogger logger = new Log4JLogger(mock); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { org.apache.log4j.Logger mock = diff --git a/common/src/test/java/io/netty/logging/Slf4JLoggerTest.java b/common/src/test/java/io/netty/logging/Slf4JLoggerTest.java index 6350c18ed1..02e80cb387 100644 --- a/common/src/test/java/io/netty/logging/Slf4JLoggerTest.java +++ b/common/src/test/java/io/netty/logging/Slf4JLoggerTest.java @@ -23,6 +23,19 @@ import org.junit.Test; public class Slf4JLoggerTest { private static final Exception e = new Exception(); + @Test + public void testIsTraceEnabled() { + org.slf4j.Logger mock = + createStrictMock(org.slf4j.Logger.class); + + expect(mock.isTraceEnabled()).andReturn(true); + replay(mock); + + InternalLogger logger = new Slf4JLogger(mock); + assertTrue(logger.isTraceEnabled()); + verify(mock); + } + @Test public void testIsDebugEnabled() { org.slf4j.Logger mock = @@ -75,6 +88,32 @@ public class Slf4JLoggerTest { verify(mock); } + @Test + public void testTrace() { + org.slf4j.Logger mock = + createStrictMock(org.slf4j.Logger.class); + + mock.trace("a"); + replay(mock); + + InternalLogger logger = new Slf4JLogger(mock); + logger.trace("a"); + verify(mock); + } + + @Test + public void testTraceWithException() { + org.slf4j.Logger mock = + createStrictMock(org.slf4j.Logger.class); + + mock.trace("a", e); + replay(mock); + + InternalLogger logger = new Slf4JLogger(mock); + logger.trace("a", e); + verify(mock); + } + @Test public void testDebug() { org.slf4j.Logger mock = From 62ed610c1de29aee5a4f33f66524ca55bbe5cb40 Mon Sep 17 00:00:00 2001 From: izstas Date: Thu, 19 Jul 2012 16:15:36 +0400 Subject: [PATCH 2/5] Added TRACE level for LoggingHandler --- handler/src/main/java/io/netty/handler/logging/LogLevel.java | 1 + 1 file changed, 1 insertion(+) diff --git a/handler/src/main/java/io/netty/handler/logging/LogLevel.java b/handler/src/main/java/io/netty/handler/logging/LogLevel.java index 351583ceff..88b865fa21 100644 --- a/handler/src/main/java/io/netty/handler/logging/LogLevel.java +++ b/handler/src/main/java/io/netty/handler/logging/LogLevel.java @@ -18,6 +18,7 @@ package io.netty.handler.logging; import io.netty.logging.InternalLogLevel; public enum LogLevel { + TRACE(InternalLogLevel.TRACE), DEBUG(InternalLogLevel.DEBUG), INFO(InternalLogLevel.INFO), WARN(InternalLogLevel.WARN), From bdde5a20f607a7b65489ed6db5094852bc4f0b53 Mon Sep 17 00:00:00 2001 From: norman Date: Mon, 30 Jul 2012 07:44:53 +0200 Subject: [PATCH 3/5] Allow to set Expires attribute to a date in theast. See #479 --- .../src/main/java/io/netty/handler/codec/http/Cookie.java | 4 ++-- .../main/java/io/netty/handler/codec/http/DefaultCookie.java | 2 +- .../java/io/netty/handler/codec/http/ServerCookieEncoder.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/Cookie.java b/codec-http/src/main/java/io/netty/handler/codec/http/Cookie.java index 4cd1109c72..8849cf0f77 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/Cookie.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/Cookie.java @@ -87,7 +87,7 @@ public interface Cookie extends Comparable { void setComment(String comment); /** - * Returns the maximum age of this {@link Cookie} in seconds. + * Returns the maximum age of this {@link Cookie} in seconds or {@link Long#MIN_VALUE} if unspecified * * @return The maximum age of this {@link Cookie} */ @@ -97,7 +97,7 @@ public interface Cookie extends Comparable { * Sets the maximum age of this {@link Cookie} in seconds. * If an age of {@code 0} is specified, this {@link Cookie} will be * automatically removed by browser because it will expire immediately. - * If {@code -1} is specified, this {@link Cookie} will be removed when the + * If {@link Long#MIN_VALUE} is specified, this {@link Cookie} will be removed when the * browser is closed. * * @param maxAge The maximum age of this {@link Cookie} in seconds diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultCookie.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultCookie.java index 81a9b93ebf..98ee92bc5b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultCookie.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultCookie.java @@ -35,7 +35,7 @@ public class DefaultCookie implements Cookie { private boolean discard; private Set ports = Collections.emptySet(); private Set unmodifiablePorts = ports; - private long maxAge = -1; + private long maxAge = Long.MIN_VALUE; private int version; private boolean secure; private boolean httpOnly; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java index 4c917f52db..4a0c56d2d5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java @@ -54,7 +54,7 @@ public final class ServerCookieEncoder { add(buf, cookie.getName(), cookie.getValue()); - if (cookie.getMaxAge() >= 0) { + if (cookie.getMaxAge() != Long.MIN_VALUE) { if (cookie.getVersion() == 0) { addUnquoted(buf, CookieHeaderNames.EXPIRES, new HttpHeaderDateFormat().format( From ba1c7c5c550fec36b37d2d8bac1674620ab30c99 Mon Sep 17 00:00:00 2001 From: norman Date: Mon, 30 Jul 2012 08:01:46 +0200 Subject: [PATCH 4/5] Replace usage of QueueFactory with ConcurrentLinkedQueue and LinkedBlockingQueue. See #477 --- .../java/io/netty/handler/codec/spdy/SpdySession.java | 7 +++---- .../io/netty/channel/DefaultChannelHandlerContext.java | 8 ++++---- .../java/io/netty/channel/SingleThreadEventExecutor.java | 4 ++-- .../java/io/netty/channel/socket/oio/OioEventLoop.java | 4 ++-- .../channel/local/LocalTransportThreadModelTest.java | 6 +++--- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java index a8d20d5a55..727b2a9ec0 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdySession.java @@ -15,14 +15,13 @@ */ package io.netty.handler.codec.spdy; -import io.netty.util.internal.QueueFactory; - import java.util.Comparator; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; final class SpdySession { @@ -177,7 +176,7 @@ final class SpdySession { private final AtomicInteger sendWindowSize; private final AtomicInteger receiveWindowSize; private volatile int receiveWindowSizeLowerBound; - private final BlockingQueue pendingWriteQueue = QueueFactory.createQueue(); + private final Queue pendingWriteQueue = new ConcurrentLinkedQueue(); StreamState( byte priority, boolean remoteSideClosed, boolean localSideClosed, diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index e025f35ea5..c845595854 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -21,13 +21,13 @@ import io.netty.buffer.ChannelBuf; import io.netty.buffer.MessageBuf; import io.netty.buffer.Unpooled; import io.netty.util.DefaultAttributeMap; -import io.netty.util.internal.QueueFactory; import java.net.SocketAddress; import java.util.Collections; import java.util.EnumSet; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { @@ -746,7 +746,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements static final class MessageBridge { final MessageBuf msgBuf = Unpooled.messageBuffer(); - final BlockingQueue exchangeBuf = QueueFactory.createQueue(); + final Queue exchangeBuf = new ConcurrentLinkedQueue(); void fill() { if (msgBuf.isEmpty()) { @@ -771,7 +771,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements static final class ByteBridge { final ByteBuf byteBuf = Unpooled.buffer(); - final BlockingQueue exchangeBuf = QueueFactory.createQueue(); + final Queue exchangeBuf = new ConcurrentLinkedQueue(); void fill() { if (!byteBuf.readable()) { diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java index 20e23cad42..ca066cb2a2 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventExecutor.java @@ -17,7 +17,6 @@ package io.netty.channel; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.QueueFactory; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; @@ -71,7 +71,7 @@ public abstract class SingleThreadEventExecutor extends AbstractExecutorService } }; - private final BlockingQueue taskQueue = QueueFactory.createQueue(); + private final BlockingQueue taskQueue = new LinkedBlockingQueue(); private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java index 72130dea46..bca22c8119 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioEventLoop.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventExecutor; -import io.netty.util.internal.QueueFactory; import java.util.Collection; import java.util.Collections; @@ -31,6 +30,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -45,7 +45,7 @@ public class OioEventLoop implements EventLoop { final ThreadFactory threadFactory; final Set activeChildren = Collections.newSetFromMap( new ConcurrentHashMap()); - final Queue idleChildren = QueueFactory.createQueue(); + final Queue idleChildren = new ConcurrentLinkedQueue(); private final ChannelException tooManyChannels; private final Unsafe unsafe = new Unsafe() { @Override diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index cfca2d5851..012e9e8a2c 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -32,11 +32,11 @@ import io.netty.channel.ChannelOutboundMessageHandler; import io.netty.channel.DefaultEventExecutor; import io.netty.channel.EventExecutor; import io.netty.channel.EventLoop; -import io.netty.util.internal.QueueFactory; import java.util.HashSet; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -337,8 +337,8 @@ public class LocalTransportThreadModelTest { private final AtomicReference exception = new AtomicReference(); - private final Queue inboundThreadNames = QueueFactory.createQueue(); - private final Queue outboundThreadNames = QueueFactory.createQueue(); + private final Queue inboundThreadNames = new ConcurrentLinkedQueue(); + private final Queue outboundThreadNames = new ConcurrentLinkedQueue(); @Override public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { From 0daf37aae36083d57b0a4afb406f96585003faf9 Mon Sep 17 00:00:00 2001 From: norman Date: Mon, 30 Jul 2012 08:05:25 +0200 Subject: [PATCH 5/5] Remove unused classes. See #477 --- .../internal/LegacyLinkedTransferQueue.java | 1373 ---------------- .../util/internal/LinkedTransferQueue.java | 1452 ----------------- .../io/netty/util/internal/QueueFactory.java | 69 - 3 files changed, 2894 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/internal/LegacyLinkedTransferQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/LinkedTransferQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/QueueFactory.java diff --git a/common/src/main/java/io/netty/util/internal/LegacyLinkedTransferQueue.java b/common/src/main/java/io/netty/util/internal/LegacyLinkedTransferQueue.java deleted file mode 100644 index acc9a41724..0000000000 --- a/common/src/main/java/io/netty/util/internal/LegacyLinkedTransferQueue.java +++ /dev/null @@ -1,1373 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -package io.netty.util.internal; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.LockSupport; - -/** - * - * This version does work even if sun.misc.Unsafe is not found in the classpath. So this is kept for compatibility reasons. - * Please use {@link QueueFactory} to create a Queue as it will use the "optimal" implementation depending on the JVM - * - *
- *
- * - * An unbounded {@link BlockingQueue} based on linked nodes. - * This queue orders elements FIFO (first-in-first-out) with respect - * to any given producer. The head of the queue is that - * element that has been on the queue the longest time for some - * producer. The tail of the queue is that element that has - * been on the queue the shortest time for some producer. - * - *

Beware that, unlike in most collections, the {@code size} - * method is NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements. - * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. - * - *

Memory consistency effects: As with other concurrent - * collections, actions in a thread prior to placing an object into a - * {@code LinkedTransferQueue} - * happen-before - * actions subsequent to the access or removal of that element from - * the {@code LinkedTransferQueue} in another thread. - * - *

This class is a member of the - * - * Java Collections Framework. - * @param the type of elements held in this collection - */ -public class LegacyLinkedTransferQueue extends AbstractQueue - implements BlockingQueue, java.io.Serializable { - private static final long serialVersionUID = -3223113410248163686L; - - /* - * *** Overview of Dual Queues with Slack *** - * - * Dual Queues, introduced by Scherer and Scott - * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are - * (linked) queues in which nodes may represent either data or - * requests. When a thread tries to enqueue a data node, but - * encounters a request node, it instead "matches" and removes it; - * and vice versa for enqueuing requests. Blocking Dual Queues - * arrange that threads enqueuing unmatched requests block until - * other threads provide the match. Dual Synchronous Queues (see - * Scherer, Lea, & Scott - * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) - * additionally arrange that threads enqueuing unmatched data also - * block. Dual Transfer Queues support all of these modes, as - * dictated by callers. - * - * A FIFO dual queue may be implemented using a variation of the - * Michael & Scott (M&S) lock-free queue algorithm - * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). - * It maintains two pointer fields, "head", pointing to a - * (matched) node that in turn points to the first actual - * (unmatched) queue node (or null if empty); and "tail" that - * points to the last node on the queue (or again null if - * empty). For example, here is a possible queue with four data - * elements: - * - * head tail - * | | - * v v - * M -> U -> U -> U -> U - * - * The M&S queue algorithm is known to be prone to scalability and - * overhead limitations when maintaining (via CAS) these head and - * tail pointers. This has led to the development of - * contention-reducing variants such as elimination arrays (see - * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and - * optimistic back pointers (see Ladan-Mozes & Shavit - * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). - * However, the nature of dual queues enables a simpler tactic for - * improving M&S-style implementations when dual-ness is needed. - * - * In a dual queue, each node must atomically maintain its match - * status. While there are other possible variants, we implement - * this here as: for a data-mode node, matching entails CASing an - * "item" field from a non-null data value to null upon match, and - * vice-versa for request nodes, CASing from null to a data - * value. (Note that the linearization properties of this style of - * queue are easy to verify -- elements are made available by - * linking, and unavailable by matching.) Compared to plain M&S - * queues, this property of dual queues requires one additional - * successful atomic operation per enq/deq pair. But it also - * enables lower cost variants of queue maintenance mechanics. (A - * variation of this idea applies even for non-dual queues that - * support deletion of interior elements, such as - * j.u.c.ConcurrentLinkedQueue.) - * - * Once a node is matched, its match status can never again - * change. We may thus arrange that the linked list of them - * contain a prefix of zero or more matched nodes, followed by a - * suffix of zero or more unmatched nodes. (Note that we allow - * both the prefix and suffix to be zero length, which in turn - * means that we do not use a dummy header.) If we were not - * concerned with either time or space efficiency, we could - * correctly perform enqueue and dequeue operations by traversing - * from a pointer to the initial node; CASing the item of the - * first unmatched node on match and CASing the next field of the - * trailing node on appends. (Plus some special-casing when - * initially empty). While this would be a terrible idea in - * itself, it does have the benefit of not requiring ANY atomic - * updates on head/tail fields. - * - * We introduce here an approach that lies between the extremes of - * never versus always updating queue (head and tail) pointers. - * This offers a tradeoff between sometimes requiring extra - * traversal steps to locate the first and/or last unmatched - * nodes, versus the reduced overhead and contention of fewer - * updates to queue pointers. For example, a possible snapshot of - * a queue is: - * - * head tail - * | | - * v v - * M -> M -> U -> U -> U -> U - * - * The best value for this "slack" (the targeted maximum distance - * between the value of "head" and the first unmatched node, and - * similarly for "tail") is an empirical matter. We have found - * that using very small constants in the range of 1-3 work best - * over a range of platforms. Larger values introduce increasing - * costs of cache misses and risks of long traversal chains, while - * smaller values increase CAS contention and overhead. - * - * Dual queues with slack differ from plain M&S dual queues by - * virtue of only sometimes updating head or tail pointers when - * matching, appending, or even traversing nodes; in order to - * maintain a targeted slack. The idea of "sometimes" may be - * operationalized in several ways. The simplest is to use a - * per-operation counter incremented on each traversal step, and - * to try (via CAS) to update the associated queue pointer - * whenever the count exceeds a threshold. Another, that requires - * more overhead, is to use random number generators to update - * with a given probability per traversal step. - * - * In any strategy along these lines, because CASes updating - * fields may fail, the actual slack may exceed targeted - * slack. However, they may be retried at any time to maintain - * targets. Even when using very small slack values, this - * approach works well for dual queues because it allows all - * operations up to the point of matching or appending an item - * (hence potentially allowing progress by another thread) to be - * read-only, thus not introducing any further contention. As - * described below, we implement this by performing slack - * maintenance retries only after these points. - * - * As an accompaniment to such techniques, traversal overhead can - * be further reduced without increasing contention of head - * pointer updates: Threads may sometimes shortcut the "next" link - * path from the current "head" node to be closer to the currently - * known first unmatched node, and similarly for tail. Again, this - * may be triggered with using thresholds or randomization. - * - * These ideas must be further extended to avoid unbounded amounts - * of costly-to-reclaim garbage caused by the sequential "next" - * links of nodes starting at old forgotten head nodes: As first - * described in detail by Boehm - * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC - * delays noticing that any arbitrarily old node has become - * garbage, all newer dead nodes will also be unreclaimed. - * (Similar issues arise in non-GC environments.) To cope with - * this in our implementation, upon CASing to advance the head - * pointer, we set the "next" link of the previous head to point - * only to itself; thus limiting the length of connected dead lists. - * (We also take similar care to wipe out possibly garbage - * retaining values held in other Node fields.) However, doing so - * adds some further complexity to traversal: If any "next" - * pointer links to itself, it indicates that the current thread - * has lagged behind a head-update, and so the traversal must - * continue from the "head". Traversals trying to find the - * current tail starting from "tail" may also encounter - * self-links, in which case they also continue at "head". - * - * It is tempting in slack-based scheme to not even use CAS for - * updates (similarly to Ladan-Mozes & Shavit). However, this - * cannot be done for head updates under the above link-forgetting - * mechanics because an update may leave head at a detached node. - * And while direct writes are possible for tail updates, they - * increase the risk of long retraversals, and hence long garbage - * chains, which can be much more costly than is worthwhile - * considering that the cost difference of performing a CAS vs - * write is smaller when they are not triggered on each operation - * (especially considering that writes and CASes equally require - * additional GC bookkeeping ("write barriers") that are sometimes - * more costly than the writes themselves because of contention). - * - * *** Overview of implementation *** - * - * We use a threshold-based approach to updates, with a slack - * threshold of two -- that is, we update head/tail when the - * current pointer appears to be two or more steps away from the - * first/last node. The slack value is hard-wired: a path greater - * than one is naturally implemented by checking equality of - * traversal pointers except when the list has only one element, - * in which case we keep slack threshold at one. Avoiding tracking - * explicit counts across method calls slightly simplifies an - * already-messy implementation. Using randomization would - * probably work better if there were a low-quality dirt-cheap - * per-thread one available, but even ThreadLocalRandom is too - * heavy for these purposes. - * - * With such a small slack threshold value, it is not worthwhile - * to augment this with path short-circuiting (i.e., unsplicing - * interior nodes) except in the case of cancellation/removal (see - * below). - * - * We allow both the head and tail fields to be null before any - * nodes are enqueued; initializing upon first append. This - * simplifies some other logic, as well as providing more - * efficient explicit control paths instead of letting JVMs insert - * implicit NullPointerExceptions when they are null. While not - * currently fully implemented, we also leave open the possibility - * of re-nulling these fields when empty (which is complicated to - * arrange, for little benefit.) - * - * All enqueue/dequeue operations are handled by the single method - * "xfer" with parameters indicating whether to act as some form - * of offer, put, poll, take, or transfer (each possibly with - * timeout). The relative complexity of using one monolithic - * method outweighs the code bulk and maintenance problems of - * using separate methods for each case. - * - * Operation consists of up to three phases. The first is - * implemented within method xfer, the second in tryAppend, and - * the third in method awaitMatch. - * - * 1. Try to match an existing node - * - * Starting at head, skip already-matched nodes until finding - * an unmatched node of opposite mode, if one exists, in which - * case matching it and returning, also if necessary updating - * head to one past the matched node (or the node itself if the - * list has no other unmatched nodes). If the CAS misses, then - * a loop retries advancing head by two steps until either - * success or the slack is at most two. By requiring that each - * attempt advances head by two (if applicable), we ensure that - * the slack does not grow without bound. Traversals also check - * if the initial head is now off-list, in which case they - * start at the new head. - * - * If no candidates are found and the call was untimed - * poll/offer, (argument "how" is NOW) return. - * - * 2. Try to append a new node (method tryAppend) - * - * Starting at current tail pointer, find the actual last node - * and try to append a new node (or if head was null, establish - * the first node). Nodes can be appended only if their - * predecessors are either already matched or are of the same - * mode. If we detect otherwise, then a new node with opposite - * mode must have been appended during traversal, so we must - * restart at phase 1. The traversal and update steps are - * otherwise similar to phase 1: Retrying upon CAS misses and - * checking for staleness. In particular, if a self-link is - * encountered, then we can safely jump to a node on the list - * by continuing the traversal at current head. - * - * On successful append, if the call was ASYNC, return. - * - * 3. Await match or cancellation (method awaitMatch) - * - * Wait for another thread to match node; instead cancelling if - * the current thread was interrupted or the wait timed out. On - * multiprocessors, we use front-of-queue spinning: If a node - * appears to be the first unmatched node in the queue, it - * spins a bit before blocking. In either case, before blocking - * it tries to unsplice any nodes between the current "head" - * and the first unmatched node. - * - * Front-of-queue spinning vastly improves performance of - * heavily contended queues. And so long as it is relatively - * brief and "quiet", spinning does not much impact performance - * of less-contended queues. During spins threads check their - * interrupt status and generate a thread-local random number - * to decide to occasionally perform a Thread.yield. While - * yield has underdefined specs, we assume that might it help, - * and will not hurt in limiting impact of spinning on busy - * systems. We also use smaller (1/2) spins for nodes that are - * not known to be front but whose predecessors have not - * blocked -- these "chained" spins avoid artifacts of - * front-of-queue rules which otherwise lead to alternating - * nodes spinning vs blocking. Further, front threads that - * represent phase changes (from data to request node or vice - * versa) compared to their predecessors receive additional - * chained spins, reflecting longer paths typically required to - * unblock threads during phase changes. - * ** Unlinking removed interior nodes ** - * - * In addition to minimizing garbage retention via self-linking - * described above, we also unlink removed interior nodes. These - * may arise due to timed out or interrupted waits, or calls to - * remove(x) or Iterator.remove. Normally, given a node that was - * at one time known to be the predecessor of some node s that is - * to be removed, we can unsplice s by CASing the next field of - * its predecessor if it still points to s (otherwise s must - * already have been removed or is now offlist). But there are two - * situations in which we cannot guarantee to make node s - * unreachable in this way: (1) If s is the trailing node of list - * (i.e., with null next), then it is pinned as the target node - * for appends, so can only be removed later after other nodes are - * appended. (2) We cannot necessarily unlink s given a - * predecessor node that is matched (including the case of being - * cancelled): the predecessor may already be unspliced, in which - * case some previous reachable node may still point to s. - * (For further explanation see Herlihy & Shavit "The Art of - * Multiprocessor Programming" chapter 9). Although, in both - * cases, we can rule out the need for further action if either s - * or its predecessor are (or can be made to be) at, or fall off - * from, the head of list. - * - * Without taking these into account, it would be possible for an - * unbounded number of supposedly removed nodes to remain - * reachable. Situations leading to such buildup are uncommon but - * can occur in practice; for example when a series of short timed - * calls to poll repeatedly time out but never otherwise fall off - * the list because of an untimed call to take at the front of the - * queue. - * - * When these cases arise, rather than always retraversing the - * entire list to find an actual predecessor to unlink (which - * won't help for case (1) anyway), we record a conservative - * estimate of possible unsplice failures (in "sweepVotes"). - * We trigger a full sweep when the estimate exceeds a threshold - * ("SWEEP_THRESHOLD") indicating the maximum number of estimated - * removal failures to tolerate before sweeping through, unlinking - * cancelled nodes that were not unlinked upon initial removal. - * We perform sweeps by the thread hitting threshold (rather than - * background threads or by spreading work to other threads) - * because in the main contexts in which removal occurs, the - * caller is already timed-out, cancelled, or performing a - * potentially O(n) operation (e.g. remove(x)), none of which are - * time-critical enough to warrant the overhead that alternatives - * would impose on other threads. - * - * Because the sweepVotes estimate is conservative, and because - * nodes become unlinked "naturally" as they fall off the head of - * the queue, and because we allow votes to accumulate even while - * sweeps are in progress, there are typically significantly fewer - * such nodes than estimated. Choice of a threshold value - * balances the likelihood of wasted effort and contention, versus - * providing a worst-case bound on retention of interior nodes in - * quiescent queues. The value defined below was chosen - * empirically to balance these under various timeout scenarios. - * - * Note that we cannot self-link unlinked interior nodes during - * sweeps. However, the associated garbage chains terminate when - * some successor ultimately falls off the head of the list and is - * self-linked. - */ - - /** True if on multiprocessor */ - private static final boolean MP = - Runtime.getRuntime().availableProcessors() > 1; - - /** - * The number of times to spin (with randomly interspersed calls - * to Thread.yield) on multiprocessor before blocking when a node - * is apparently the first waiter in the queue. See above for - * explanation. Must be a power of two. The value is empirically - * derived -- it works pretty well across a variety of processors, - * numbers of CPUs, and OSes. - */ - private static final int FRONT_SPINS = 1 << 7; - - /** - * The number of times to spin before blocking when a node is - * preceded by another node that is apparently spinning. Also - * serves as an increment to FRONT_SPINS on phase changes, and as - * base average frequency for yielding during spins. Must be a - * power of two. - */ - private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; - - /** - * The maximum number of estimated removal failures (sweepVotes) - * to tolerate before sweeping through the queue unlinking - * cancelled nodes that were not unlinked upon initial - * removal. See above for explanation. The value must be at least - * two to avoid useless sweeps when removing trailing nodes. - */ - static final int SWEEP_THRESHOLD = 32; - - /** - * Queue nodes. Uses Object, not E, for items to allow forgetting - * them after use. Relies heavily on Unsafe mechanics to minimize - * unnecessary ordering constraints: Writes that are intrinsically - * ordered wrt other accesses or CASes use simple relaxed forms. - */ - static final class Node { - final boolean isData; // false if this is a request node - volatile Object item; // initially non-null if isData; CASed to match - volatile Node next; - volatile Thread waiter; // null until waiting - - // CAS methods for fields - boolean casNext(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return nextUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (next == cmp) { - next = val; - return true; - } else { - return false; - } - } - } - } - - boolean casItem(Object cmp, Object val) { - // assert cmp == null || cmp.getClass() != Node.class; - if (AtomicFieldUpdaterUtil.isAvailable()) { - return itemUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (item == cmp) { - item = val; - return true; - } else { - return false; - } - } - } - } - - /** - * Constructs a new node. Uses relaxed write because item can - * only be seen after publication via casNext. - */ - Node(Object item, boolean isData) { - this.item = item; - this.isData = isData; - } - - /** - * Links node to itself to avoid garbage retention. Called - * only after CASing head field, so uses relaxed write. - */ - void forgetNext() { - this.next = this; - } - - /** - * Sets item to self and waiter to null, to avoid garbage - * retention after matching or cancelling. Uses relaxed writes - * bacause order is already constrained in the only calling - * contexts: item is forgotten only after volatile/atomic - * mechanics that extract items. Similarly, clearing waiter - * follows either CAS or return from park (if ever parked; - * else we don't care). - */ - void forgetContents() { - this.item = this; - this.waiter = null; - } - - /** - * Returns true if this node has been matched, including the - * case of artificial matches due to cancellation. - */ - boolean isMatched() { - Object x = item; - return x == this || x == null == isData; - } - - /** - * Returns true if this is an unmatched request node. - */ - boolean isUnmatchedRequest() { - return !isData && item == null; - } - - /** - * Returns true if a node with the given mode cannot be - * appended to this node because this node is unmatched and - * has opposite data mode. - */ - boolean cannotPrecede(boolean haveData) { - boolean d = isData; - Object x; - return d != haveData && (x = item) != this && x != null == d; - } - - /** - * Tries to artificially match a data node -- used by remove. - */ - boolean tryMatchData() { - // assert isData; - Object x = item; - if (x != null && x != this && casItem(x, null)) { - LockSupport.unpark(waiter); - return true; - } - return false; - } - - private static final AtomicReferenceFieldUpdater nextUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Node.class, "next"); - private static final AtomicReferenceFieldUpdater itemUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(Node.class, Object.class, "item"); - - } - - /** head of the queue; null until first enqueue */ - transient volatile Node head; - - /** tail of the queue; null until first append */ - transient volatile Node tail; - - /** The number of apparent failures to unsplice removed nodes */ - transient volatile int sweepVotes; - - // CAS methods for fields - private boolean casTail(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return tailUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (tail == cmp) { - tail = val; - return true; - } else { - return false; - } - } - } - } - - private boolean casHead(Node cmp, Node val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return headUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (head == cmp) { - head = val; - return true; - } else { - return false; - } - } - } - } - - private boolean casSweepVotes(int cmp, int val) { - if (AtomicFieldUpdaterUtil.isAvailable()) { - return sweepVotesUpdater.compareAndSet(this, cmp, val); - } else { - synchronized (this) { - if (sweepVotes == cmp) { - sweepVotes = val; - return true; - } else { - return false; - } - } - } - } - - /* - * Possible values for "how" argument in xfer method. - */ - private static final int NOW = 0; // for untimed poll, tryTransfer - private static final int ASYNC = 1; // for offer, put, add - private static final int SYNC = 2; // for transfer, take - private static final int TIMED = 3; // for timed poll, tryTransfer - - @SuppressWarnings("unchecked") - static E cast(Object item) { - // assert item == null || item.getClass() != Node.class; - // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 - return (E) item; - } - - /** - * Implements all queuing methods. See above for explanation. - * - * @param e the item or null for take - * @param haveData true if this is a put, else a take - * @param how NOW, ASYNC, SYNC, or TIMED - * @param nanos timeout in nanosecs, used only if mode is TIMED - * @return an item if matched, else e - * @throws NullPointerException if haveData mode but e is null - */ - @SuppressWarnings("unchecked") - private E xfer(E e, boolean haveData, int how, long nanos) { - if (haveData && e == null) { - throw new NullPointerException(); - } - Node s = null; // the node to append, if needed - - retry: for (;;) { // restart on append race - - for (Node h = head, p = h; p != null;) { // find & match first node - boolean isData = p.isData; - Object item = p.item; - if (item != p && item != null == isData) { // unmatched - if (isData == haveData) { // can't match - break; - } - if (p.casItem(item, e)) { // match - for (Node q = p; q != h;) { - Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null? q : n)) { - h.forgetNext(); - break; - } // advance and retry - if ((h = head) == null || - (q = h.next) == null || !q.isMatched()) { - break; // unless slack < 2 - } - } - LockSupport.unpark(p.waiter); - // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 - return (E) LegacyLinkedTransferQueue.cast(item); - } - } - Node n = p.next; - p = p != n ? n : (h = head); // Use head if p offlist - } - - if (how != NOW) { // No matches available - if (s == null) { - s = new Node(e, haveData); - } - Node pred = tryAppend(s, haveData); - if (pred == null) { - continue retry; // lost race vs opposite mode - } - if (how != ASYNC) { - return awaitMatch(s, pred, e, how == TIMED, nanos); - } - } - return e; // not waiting - } - } - - /** - * Tries to append node s as tail. - * - * @param s the node to append - * @param haveData true if appending in data mode - * @return null on failure due to losing race with append in - * different mode, else s's predecessor, or s itself if no - * predecessor - */ - private Node tryAppend(Node s, boolean haveData) { - for (Node t = tail, p = t;;) { // move p to last node and append - Node n, u; // temps for reads of next & tail - if (p == null && (p = head) == null) { - if (casHead(null, s)) { - return s; // initialize - } - } - else if (p.cannotPrecede(haveData)) { - return null; // lost race vs opposite mode - } else if ((n = p.next) != null) { // not last; keep traversing - p = p != t && t != (u = tail) ? (t = u) : // stale tail - p != n ? n : null; // restart if off list - } else if (!p.casNext(null, s)) { - p = p.next; // re-read on CAS failure - } else { - if (p != t) { // update if slack now >= 2 - while ((tail != t || !casTail(t, s)) && - (t = tail) != null && - (s = t.next) != null && // advance and retry - (s = s.next) != null && s != t) { - continue; - } - } - return p; - } - } - } - - /** - * Spins/yields/blocks until node s is matched or caller gives up. - * - * @param s the waiting node - * @param pred the predecessor of s, or s itself if it has no - * predecessor, or null if unknown (the null case does not occur - * in any current calls but may in possible future extensions) - * @param e the comparison value for checking match - * @param timed if true, wait only until timeout elapses - * @param nanos timeout in nanosecs, used only if timed is true - * @return matched item, or e if unmatched on interrupt or timeout - */ - @SuppressWarnings("unchecked") - private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { - long lastTime = timed ? System.nanoTime() : 0L; - Thread w = Thread.currentThread(); - int spins = -1; // initialized after first item and cancel checks - ThreadLocalRandom randomYields = null; // bound if needed - - for (;;) { - Object item = s.item; - if (item != e) { // matched - // assert item != s; - s.forgetContents(); // avoid garbage - // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 - return (E) LegacyLinkedTransferQueue.cast(item); - } - if ((w.isInterrupted() || timed && nanos <= 0) && - s.casItem(e, s)) { // cancel - unsplice(pred, s); - return e; - } - - if (spins < 0) { // establish spins at/near front - if ((spins = spinsFor(pred, s.isData)) > 0) { - randomYields = ThreadLocalRandom.current(); - } - } - else if (spins > 0) { // spin - --spins; - if (randomYields.nextInt(CHAINED_SPINS) == 0) { - Thread.yield(); // occasionally yield - } - } - else if (s.waiter == null) { - s.waiter = w; // request unpark then recheck - } - else if (timed) { - long now = System.nanoTime(); - if ((nanos -= now - lastTime) > 0) { - LockSupport.parkNanos(nanos); - } - lastTime = now; - } - else { - LockSupport.park(); - } - } - } - - /** - * Returns spin/yield value for a node with given predecessor and - * data mode. See above for explanation. - */ - private static int spinsFor(Node pred, boolean haveData) { - if (MP && pred != null) { - if (pred.isData != haveData) { // phase change - return FRONT_SPINS + CHAINED_SPINS; - } - if (pred.isMatched()) { // probably at front - return FRONT_SPINS; - } - if (pred.waiter == null) { // pred apparently spinning - return CHAINED_SPINS; - } - } - return 0; - } - - /* -------------- Traversal methods -------------- */ - - /** - * Returns the successor of p, or the head node if p.next has been - * linked to self, which will only be true if traversing with a - * stale pointer that is now off the list. - */ - final Node succ(Node p) { - Node next = p.next; - return p == next ? head : next; - } - - /** - * Returns the first unmatched node of the given mode, or null if - * none. Used by methods isEmpty, hasWaitingConsumer. - */ - private Node firstOfMode(boolean isData) { - for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { - return p.isData == isData ? p : null; - } - } - return null; - } - - /** - * Returns the item in the first unmatched node with isData; or - * null if none. Used by peek. - */ - @SuppressWarnings("unchecked") - private E firstDataItem() { - for (Node p = head; p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { - // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 - return (E) LegacyLinkedTransferQueue.cast(item); - } - } - else if (item == null) { - return null; - } - } - return null; - } - - /** - * Traverses and counts unmatched nodes of the given mode. - * Used by methods size and getWaitingConsumerCount. - */ - private int countOfMode(boolean data) { - int count = 0; - for (Node p = head; p != null; ) { - if (!p.isMatched()) { - if (p.isData != data) { - return 0; - } - if (++count == Integer.MAX_VALUE) { // saturated - break; - } - } - Node n = p.next; - if (n != p) { - p = n; - } else { - count = 0; - p = head; - } - } - return count; - } - - final class Itr implements Iterator { - private Node nextNode; // next node to return item for - private E nextItem; // the corresponding item - private Node lastRet; // last returned node, to support remove - private Node lastPred; // predecessor to unlink lastRet - - /** - * Moves to next node after prev, or first node if prev null. - */ - @SuppressWarnings("unchecked") - private void advance(Node prev) { - lastPred = lastRet; - lastRet = prev; - for (Node p = prev == null ? head : succ(prev); - p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { - // Explicit cast, see http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6302954 - nextItem = (E) LegacyLinkedTransferQueue.cast(item); - nextNode = p; - return; - } - } - else if (item == null) { - break; - } - } - nextNode = null; - } - - Itr() { - advance(null); - } - - @Override - public boolean hasNext() { - return nextNode != null; - } - - @Override - public E next() { - Node p = nextNode; - if (p == null) { - throw new NoSuchElementException(); - } - E e = nextItem; - advance(p); - return e; - } - - @Override - public void remove() { - Node p = lastRet; - if (p == null) { - throw new IllegalStateException(); - } - if (p.tryMatchData()) { - unsplice(lastPred, p); - } - } - } - - /* -------------- Removal methods -------------- */ - - /** - * Unsplices (now or later) the given deleted/cancelled node with - * the given predecessor. - * - * @param pred a node that was at one time known to be the - * predecessor of s, or null or s itself if s is/was at head - * @param s the node to be unspliced - */ - final void unsplice(Node pred, Node s) { - s.forgetContents(); // forget unneeded fields - /* - * See above for rationale. Briefly: if pred still points to - * s, try to unlink s. If s cannot be unlinked, because it is - * trailing node or pred might be unlinked, and neither pred - * nor s are head or offlist, add to sweepVotes, and if enough - * votes have accumulated, sweep. - */ - if (pred != null && pred != s && pred.next == s) { - Node n = s.next; - if (n == null || - n != s && pred.casNext(s, n) && pred.isMatched()) { - for (;;) { // check if at, or could be, head - Node h = head; - if (h == pred || h == s || h == null) { - return; // at head or list empty - } - if (!h.isMatched()) { - break; - } - Node hn = h.next; - if (hn == null) { - return; // now empty - } - if (hn != h && casHead(h, hn)) { - h.forgetNext(); // advance head - } - } - if (pred.next != pred && s.next != s) { // recheck if offlist - for (;;) { // sweep now if enough votes - int v = sweepVotes; - if (v < SWEEP_THRESHOLD) { - if (casSweepVotes(v, v + 1)) { - break; - } - } - else if (casSweepVotes(v, 0)) { - sweep(); - break; - } - } - } - } - } - } - - /** - * Unlinks matched (typically cancelled) nodes encountered in a - * traversal from head. - */ - private void sweep() { - for (Node p = head, s, n; p != null && (s = p.next) != null; ) { - if (!s.isMatched()) { - // Unmatched nodes are never self-linked - p = s; - } else if ((n = s.next) == null) { // trailing node is pinned - break; - } else if (s == n) { // stale - // No need to also check for p == s, since that implies s == n - p = head; - } else { - p.casNext(s, n); - } - } - } - - /** - * Main implementation of remove(Object) - */ - private boolean findAndRemove(Object e) { - if (e != null) { - for (Node pred = null, p = head; p != null; ) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p && e.equals(item) && - p.tryMatchData()) { - unsplice(pred, p); - return true; - } - } - else if (item == null) { - break; - } - pred = p; - if ((p = p.next) == pred) { // stale - pred = null; - p = head; - } - } - } - return false; - } - - - /** - * Creates an initially empty {@code LinkedTransferQueue}. - */ - public LegacyLinkedTransferQueue() { - } - - /** - * Creates a {@code LinkedTransferQueue} - * initially containing the elements of the given collection, - * added in traversal order of the collection's iterator. - * - * @param c the collection of elements to initially contain - * @throws NullPointerException if the specified collection or any - * of its elements are null - */ - public LegacyLinkedTransferQueue(Collection c) { - this(); - addAll(c); - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never block. - * - * @throws NullPointerException if the specified element is null - */ - @Override - public void put(E e) { - xfer(e, true, ASYNC, 0); - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never block or - * return {@code false}. - * - * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean offer(E e, long timeout, TimeUnit unit) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never return {@code false}. - * - * @return {@code true} (as specified by - * {@link BlockingQueue#offer(Object) BlockingQueue.offer}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean offer(E e) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never throw - * {@link IllegalStateException} or return {@code false}. - * - * @return {@code true} (as specified by {@link Collection#add}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean add(E e) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Transfers the element to a waiting consumer immediately, if possible. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * otherwise returning {@code false} without enqueuing the element. - * - * @throws NullPointerException if the specified element is null - */ - public boolean tryTransfer(E e) { - return xfer(e, true, NOW, 0) == null; - } - - /** - * Transfers the element to a consumer, waiting if necessary to do so. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * else inserts the specified element at the tail of this queue - * and waits until the element is received by a consumer. - * - * @throws NullPointerException if the specified element is null - */ - public void transfer(E e) throws InterruptedException { - if (xfer(e, true, SYNC, 0) != null) { - Thread.interrupted(); // failure possible only due to interrupt - throw new InterruptedException(); - } - } - - /** - * Transfers the element to a consumer if it is possible to do so - * before the timeout elapses. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * else inserts the specified element at the tail of this queue - * and waits until the element is received by a consumer, - * returning {@code false} if the specified wait time elapses - * before the element can be transferred. - * - * @throws NullPointerException if the specified element is null - */ - public boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) { - return true; - } - if (!Thread.interrupted()) { - return false; - } - throw new InterruptedException(); - } - - @Override - public E take() throws InterruptedException { - E e = xfer(null, false, SYNC, 0); - if (e != null) { - return e; - } - Thread.interrupted(); - throw new InterruptedException(); - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - E e = xfer(null, false, TIMED, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) { - return e; - } - throw new InterruptedException(); - } - - @Override - public E poll() { - return xfer(null, false, NOW, 0); - } - - /** - * @throws NullPointerException {@inheritDoc} - * @throws IllegalArgumentException {@inheritDoc} - */ - @Override - public int drainTo(Collection c) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - E e; - while ( (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - /** - * @throws NullPointerException {@inheritDoc} - * @throws IllegalArgumentException {@inheritDoc} - */ - @Override - public int drainTo(Collection c, int maxElements) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - E e; - while (n < maxElements && (e = poll()) != null) { - c.add(e); - ++n; - } - return n; - } - - /** - * Returns an iterator over the elements in this queue in proper - * sequence, from head to tail. - * - *

The returned iterator is a "weakly consistent" iterator that - * will never throw - * {@link ConcurrentModificationException ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed - * to) reflect any modifications subsequent to construction. - * - * @return an iterator over the elements in this queue in proper sequence - */ - @Override - public Iterator iterator() { - return new Itr(); - } - - @Override - public E peek() { - return firstDataItem(); - } - - /** - * Returns {@code true} if this queue contains no elements. - * - * @return {@code true} if this queue contains no elements - */ - @Override - public boolean isEmpty() { - for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { - return !p.isData; - } - } - return true; - } - - public boolean hasWaitingConsumer() { - return firstOfMode(false) != null; - } - - /** - * Returns the number of elements in this queue. If this queue - * contains more than {@code Integer.MAX_VALUE} elements, returns - * {@code Integer.MAX_VALUE}. - * - *

Beware that, unlike in most collections, this method is - * NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current - * number of elements requires an O(n) traversal. - * - * @return the number of elements in this queue - */ - @Override - public int size() { - return countOfMode(true); - } - - public int getWaitingConsumerCount() { - return countOfMode(false); - } - - /** - * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element {@code e} such - * that {@code o.equals(e)}, if this queue contains one or more such - * elements. - * Returns {@code true} if this queue contained the specified element - * (or equivalently, if this queue changed as a result of the call). - * - * @param o element to be removed from this queue, if present - * @return {@code true} if this queue changed as a result of the call - */ - @Override - public boolean remove(Object o) { - return findAndRemove(o); - } - - /** - * Always returns {@code Integer.MAX_VALUE} because a - * {@code LinkedTransferQueue} is not capacity constrained. - * - * @return {@code Integer.MAX_VALUE} (as specified by - * {@link BlockingQueue#remainingCapacity()}) - */ - @Override - public int remainingCapacity() { - return Integer.MAX_VALUE; - } - - /** - * Saves the state to a stream (that is, serializes it). - * - * @serialData All of the elements (each an {@code E}) in - * the proper order, followed by a null - * @param s the stream - */ - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - for (E e : this) { - s.writeObject(e); - } - // Use trailing null as sentinel - s.writeObject(null); - } - - /** - * Reconstitutes the Queue instance from a stream (that is, - * deserializes it). - * - * @param s the stream - */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - for (;;) { - E item = (E) s.readObject(); - if (item == null) { - break; - } else { - offer(item); - } - } - } - - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater headUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(LegacyLinkedTransferQueue.class, Node.class, "head"); - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater tailUpdater = - AtomicFieldUpdaterUtil.newRefUpdater(LegacyLinkedTransferQueue.class, Node.class, "tail"); - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater sweepVotesUpdater = - AtomicFieldUpdaterUtil.newIntUpdater(LegacyLinkedTransferQueue.class, "sweepVotes"); -} - diff --git a/common/src/main/java/io/netty/util/internal/LinkedTransferQueue.java b/common/src/main/java/io/netty/util/internal/LinkedTransferQueue.java deleted file mode 100644 index e188ce572b..0000000000 --- a/common/src/main/java/io/netty/util/internal/LinkedTransferQueue.java +++ /dev/null @@ -1,1452 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package io.netty.util.internal; - -import java.util.AbstractQueue; -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; - -/** - * This class is a copied from URL revision 1.91 - *
- * The only difference is that it replace {@link BlockingQueue} and any reference to the TransferQueue interface was removed - *
- * - * - * Please use {@link QueueFactory} to create a Queue as it will use the "optimal" implementation depending on the JVM - * - *
- *
- * - * An unbounded {@link BlockingQueue} based on linked nodes. - * This queue orders elements FIFO (first-in-first-out) with respect - * to any given producer. The head of the queue is that - * element that has been on the queue the longest time for some - * producer. The tail of the queue is that element that has - * been on the queue the shortest time for some producer. - * - *

Beware that, unlike in most collections, the {@code size} method - * is NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current number - * of elements requires a traversal of the elements, and so may report - * inaccurate results if this collection is modified during traversal. - * Additionally, the bulk operations {@code addAll}, - * {@code removeAll}, {@code retainAll}, {@code containsAll}, - * {@code equals}, and {@code toArray} are not guaranteed - * to be performed atomically. For example, an iterator operating - * concurrently with an {@code addAll} operation might view only some - * of the added elements. - * - *

This class and its iterator implement all of the - * optional methods of the {@link Collection} and {@link - * Iterator} interfaces. - * - *

Memory consistency effects: As with other concurrent - * collections, actions in a thread prior to placing an object into a - * {@code LinkedTransferQueue} - * happen-before - * actions subsequent to the access or removal of that element from - * the {@code LinkedTransferQueue} in another thread. - * - *

This class is a member of the - * - * Java Collections Framework. - * - * @since 1.7 - - * @param the type of elements held in this collection - */ -@SuppressWarnings("restriction") -public class LinkedTransferQueue extends AbstractQueue - implements BlockingQueue, java.io.Serializable { - private static final long serialVersionUID = -3223113410248163686L; - - /* - * *** Overview of Dual Queues with Slack *** - * - * Dual Queues, introduced by Scherer and Scott - * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are - * (linked) queues in which nodes may represent either data or - * requests. When a thread tries to enqueue a data node, but - * encounters a request node, it instead "matches" and removes it; - * and vice versa for enqueuing requests. Blocking Dual Queues - * arrange that threads enqueuing unmatched requests block until - * other threads provide the match. Dual Synchronous Queues (see - * Scherer, Lea, & Scott - * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf) - * additionally arrange that threads enqueuing unmatched data also - * block. Dual Transfer Queues support all of these modes, as - * dictated by callers. - * - * A FIFO dual queue may be implemented using a variation of the - * Michael & Scott (M&S) lock-free queue algorithm - * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf). - * It maintains two pointer fields, "head", pointing to a - * (matched) node that in turn points to the first actual - * (unmatched) queue node (or null if empty); and "tail" that - * points to the last node on the queue (or again null if - * empty). For example, here is a possible queue with four data - * elements: - * - * head tail - * | | - * v v - * M -> U -> U -> U -> U - * - * The M&S queue algorithm is known to be prone to scalability and - * overhead limitations when maintaining (via CAS) these head and - * tail pointers. This has led to the development of - * contention-reducing variants such as elimination arrays (see - * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and - * optimistic back pointers (see Ladan-Mozes & Shavit - * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf). - * However, the nature of dual queues enables a simpler tactic for - * improving M&S-style implementations when dual-ness is needed. - * - * In a dual queue, each node must atomically maintain its match - * status. While there are other possible variants, we implement - * this here as: for a data-mode node, matching entails CASing an - * "item" field from a non-null data value to null upon match, and - * vice-versa for request nodes, CASing from null to a data - * value. (Note that the linearization properties of this style of - * queue are easy to verify -- elements are made available by - * linking, and unavailable by matching.) Compared to plain M&S - * queues, this property of dual queues requires one additional - * successful atomic operation per enq/deq pair. But it also - * enables lower cost variants of queue maintenance mechanics. (A - * variation of this idea applies even for non-dual queues that - * support deletion of interior elements, such as - * j.u.c.ConcurrentLinkedQueue.) - * - * Once a node is matched, its match status can never again - * change. We may thus arrange that the linked list of them - * contain a prefix of zero or more matched nodes, followed by a - * suffix of zero or more unmatched nodes. (Note that we allow - * both the prefix and suffix to be zero length, which in turn - * means that we do not use a dummy header.) If we were not - * concerned with either time or space efficiency, we could - * correctly perform enqueue and dequeue operations by traversing - * from a pointer to the initial node; CASing the item of the - * first unmatched node on match and CASing the next field of the - * trailing node on appends. (Plus some special-casing when - * initially empty). While this would be a terrible idea in - * itself, it does have the benefit of not requiring ANY atomic - * updates on head/tail fields. - * - * We introduce here an approach that lies between the extremes of - * never versus always updating queue (head and tail) pointers. - * This offers a tradeoff between sometimes requiring extra - * traversal steps to locate the first and/or last unmatched - * nodes, versus the reduced overhead and contention of fewer - * updates to queue pointers. For example, a possible snapshot of - * a queue is: - * - * head tail - * | | - * v v - * M -> M -> U -> U -> U -> U - * - * The best value for this "slack" (the targeted maximum distance - * between the value of "head" and the first unmatched node, and - * similarly for "tail") is an empirical matter. We have found - * that using very small constants in the range of 1-3 work best - * over a range of platforms. Larger values introduce increasing - * costs of cache misses and risks of long traversal chains, while - * smaller values increase CAS contention and overhead. - * - * Dual queues with slack differ from plain M&S dual queues by - * virtue of only sometimes updating head or tail pointers when - * matching, appending, or even traversing nodes; in order to - * maintain a targeted slack. The idea of "sometimes" may be - * operationalized in several ways. The simplest is to use a - * per-operation counter incremented on each traversal step, and - * to try (via CAS) to update the associated queue pointer - * whenever the count exceeds a threshold. Another, that requires - * more overhead, is to use random number generators to update - * with a given probability per traversal step. - * - * In any strategy along these lines, because CASes updating - * fields may fail, the actual slack may exceed targeted - * slack. However, they may be retried at any time to maintain - * targets. Even when using very small slack values, this - * approach works well for dual queues because it allows all - * operations up to the point of matching or appending an item - * (hence potentially allowing progress by another thread) to be - * read-only, thus not introducing any further contention. As - * described below, we implement this by performing slack - * maintenance retries only after these points. - * - * As an accompaniment to such techniques, traversal overhead can - * be further reduced without increasing contention of head - * pointer updates: Threads may sometimes shortcut the "next" link - * path from the current "head" node to be closer to the currently - * known first unmatched node, and similarly for tail. Again, this - * may be triggered with using thresholds or randomization. - * - * These ideas must be further extended to avoid unbounded amounts - * of costly-to-reclaim garbage caused by the sequential "next" - * links of nodes starting at old forgotten head nodes: As first - * described in detail by Boehm - * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC - * delays noticing that any arbitrarily old node has become - * garbage, all newer dead nodes will also be unreclaimed. - * (Similar issues arise in non-GC environments.) To cope with - * this in our implementation, upon CASing to advance the head - * pointer, we set the "next" link of the previous head to point - * only to itself; thus limiting the length of connected dead lists. - * (We also take similar care to wipe out possibly garbage - * retaining values held in other Node fields.) However, doing so - * adds some further complexity to traversal: If any "next" - * pointer links to itself, it indicates that the current thread - * has lagged behind a head-update, and so the traversal must - * continue from the "head". Traversals trying to find the - * current tail starting from "tail" may also encounter - * self-links, in which case they also continue at "head". - * - * It is tempting in slack-based scheme to not even use CAS for - * updates (similarly to Ladan-Mozes & Shavit). However, this - * cannot be done for head updates under the above link-forgetting - * mechanics because an update may leave head at a detached node. - * And while direct writes are possible for tail updates, they - * increase the risk of long retraversals, and hence long garbage - * chains, which can be much more costly than is worthwhile - * considering that the cost difference of performing a CAS vs - * write is smaller when they are not triggered on each operation - * (especially considering that writes and CASes equally require - * additional GC bookkeeping ("write barriers") that are sometimes - * more costly than the writes themselves because of contention). - * - * *** Overview of implementation *** - * - * We use a threshold-based approach to updates, with a slack - * threshold of two -- that is, we update head/tail when the - * current pointer appears to be two or more steps away from the - * first/last node. The slack value is hard-wired: a path greater - * than one is naturally implemented by checking equality of - * traversal pointers except when the list has only one element, - * in which case we keep slack threshold at one. Avoiding tracking - * explicit counts across method calls slightly simplifies an - * already-messy implementation. Using randomization would - * probably work better if there were a low-quality dirt-cheap - * per-thread one available, but even ThreadLocalRandom is too - * heavy for these purposes. - * - * With such a small slack threshold value, it is not worthwhile - * to augment this with path short-circuiting (i.e., unsplicing - * interior nodes) except in the case of cancellation/removal (see - * below). - * - * We allow both the head and tail fields to be null before any - * nodes are enqueued; initializing upon first append. This - * simplifies some other logic, as well as providing more - * efficient explicit control paths instead of letting JVMs insert - * implicit NullPointerExceptions when they are null. While not - * currently fully implemented, we also leave open the possibility - * of re-nulling these fields when empty (which is complicated to - * arrange, for little benefit.) - * - * All enqueue/dequeue operations are handled by the single method - * "xfer" with parameters indicating whether to act as some form - * of offer, put, poll, take, or transfer (each possibly with - * timeout). The relative complexity of using one monolithic - * method outweighs the code bulk and maintenance problems of - * using separate methods for each case. - * - * Operation consists of up to three phases. The first is - * implemented within method xfer, the second in tryAppend, and - * the third in method awaitMatch. - * - * 1. Try to match an existing node - * - * Starting at head, skip already-matched nodes until finding - * an unmatched node of opposite mode, if one exists, in which - * case matching it and returning, also if necessary updating - * head to one past the matched node (or the node itself if the - * list has no other unmatched nodes). If the CAS misses, then - * a loop retries advancing head by two steps until either - * success or the slack is at most two. By requiring that each - * attempt advances head by two (if applicable), we ensure that - * the slack does not grow without bound. Traversals also check - * if the initial head is now off-list, in which case they - * start at the new head. - * - * If no candidates are found and the call was untimed - * poll/offer, (argument "how" is NOW) return. - * - * 2. Try to append a new node (method tryAppend) - * - * Starting at current tail pointer, find the actual last node - * and try to append a new node (or if head was null, establish - * the first node). Nodes can be appended only if their - * predecessors are either already matched or are of the same - * mode. If we detect otherwise, then a new node with opposite - * mode must have been appended during traversal, so we must - * restart at phase 1. The traversal and update steps are - * otherwise similar to phase 1: Retrying upon CAS misses and - * checking for staleness. In particular, if a self-link is - * encountered, then we can safely jump to a node on the list - * by continuing the traversal at current head. - * - * On successful append, if the call was ASYNC, return. - * - * 3. Await match or cancellation (method awaitMatch) - * - * Wait for another thread to match node; instead cancelling if - * the current thread was interrupted or the wait timed out. On - * multiprocessors, we use front-of-queue spinning: If a node - * appears to be the first unmatched node in the queue, it - * spins a bit before blocking. In either case, before blocking - * it tries to unsplice any nodes between the current "head" - * and the first unmatched node. - * - * Front-of-queue spinning vastly improves performance of - * heavily contended queues. And so long as it is relatively - * brief and "quiet", spinning does not much impact performance - * of less-contended queues. During spins threads check their - * interrupt status and generate a thread-local random number - * to decide to occasionally perform a Thread.yield. While - * yield has underdefined specs, we assume that it might help, - * and will not hurt, in limiting impact of spinning on busy - * systems. We also use smaller (1/2) spins for nodes that are - * not known to be front but whose predecessors have not - * blocked -- these "chained" spins avoid artifacts of - * front-of-queue rules which otherwise lead to alternating - * nodes spinning vs blocking. Further, front threads that - * represent phase changes (from data to request node or vice - * versa) compared to their predecessors receive additional - * chained spins, reflecting longer paths typically required to - * unblock threads during phase changes. - * ** Unlinking removed interior nodes ** - * - * In addition to minimizing garbage retention via self-linking - * described above, we also unlink removed interior nodes. These - * may arise due to timed out or interrupted waits, or calls to - * remove(x) or Iterator.remove. Normally, given a node that was - * at one time known to be the predecessor of some node s that is - * to be removed, we can unsplice s by CASing the next field of - * its predecessor if it still points to s (otherwise s must - * already have been removed or is now offlist). But there are two - * situations in which we cannot guarantee to make node s - * unreachable in this way: (1) If s is the trailing node of list - * (i.e., with null next), then it is pinned as the target node - * for appends, so can only be removed later after other nodes are - * appended. (2) We cannot necessarily unlink s given a - * predecessor node that is matched (including the case of being - * cancelled): the predecessor may already be unspliced, in which - * case some previous reachable node may still point to s. - * (For further explanation see Herlihy & Shavit "The Art of - * Multiprocessor Programming" chapter 9). Although, in both - * cases, we can rule out the need for further action if either s - * or its predecessor are (or can be made to be) at, or fall off - * from, the head of list. - * - * Without taking these into account, it would be possible for an - * unbounded number of supposedly removed nodes to remain - * reachable. Situations leading to such buildup are uncommon but - * can occur in practice; for example when a series of short timed - * calls to poll repeatedly time out but never otherwise fall off - * the list because of an untimed call to take at the front of the - * queue. - * - * When these cases arise, rather than always retraversing the - * entire list to find an actual predecessor to unlink (which - * won't help for case (1) anyway), we record a conservative - * estimate of possible unsplice failures (in "sweepVotes"). - * We trigger a full sweep when the estimate exceeds a threshold - * ("SWEEP_THRESHOLD") indicating the maximum number of estimated - * removal failures to tolerate before sweeping through, unlinking - * cancelled nodes that were not unlinked upon initial removal. - * We perform sweeps by the thread hitting threshold (rather than - * background threads or by spreading work to other threads) - * because in the main contexts in which removal occurs, the - * caller is already timed-out, cancelled, or performing a - * potentially O(n) operation (e.g. remove(x)), none of which are - * time-critical enough to warrant the overhead that alternatives - * would impose on other threads. - * - * Because the sweepVotes estimate is conservative, and because - * nodes become unlinked "naturally" as they fall off the head of - * the queue, and because we allow votes to accumulate even while - * sweeps are in progress, there are typically significantly fewer - * such nodes than estimated. Choice of a threshold value - * balances the likelihood of wasted effort and contention, versus - * providing a worst-case bound on retention of interior nodes in - * quiescent queues. The value defined below was chosen - * empirically to balance these under various timeout scenarios. - * - * Note that we cannot self-link unlinked interior nodes during - * sweeps. However, the associated garbage chains terminate when - * some successor ultimately falls off the head of the list and is - * self-linked. - */ - - /** True if on multiprocessor */ - private static final boolean MP = - Runtime.getRuntime().availableProcessors() > 1; - - /** - * The number of times to spin (with randomly interspersed calls - * to Thread.yield) on multiprocessor before blocking when a node - * is apparently the first waiter in the queue. See above for - * explanation. Must be a power of two. The value is empirically - * derived -- it works pretty well across a variety of processors, - * numbers of CPUs, and OSes. - */ - private static final int FRONT_SPINS = 1 << 7; - - /** - * The number of times to spin before blocking when a node is - * preceded by another node that is apparently spinning. Also - * serves as an increment to FRONT_SPINS on phase changes, and as - * base average frequency for yielding during spins. Must be a - * power of two. - */ - private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; - - /** - * The maximum number of estimated removal failures (sweepVotes) - * to tolerate before sweeping through the queue unlinking - * cancelled nodes that were not unlinked upon initial - * removal. See above for explanation. The value must be at least - * two to avoid useless sweeps when removing trailing nodes. - */ - static final int SWEEP_THRESHOLD = 32; - - /** - * Queue nodes. Uses Object, not E, for items to allow forgetting - * them after use. Relies heavily on Unsafe mechanics to minimize - * unnecessary ordering constraints: Writes that are intrinsically - * ordered wrt other accesses or CASes use simple relaxed forms. - */ - static final class Node { - final boolean isData; // false if this is a request node - volatile Object item; // initially non-null if isData; CASed to match - volatile Node next; - volatile Thread waiter; // null until waiting - - // CAS methods for fields - final boolean casNext(Node cmp, Node val) { - return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); - } - - final boolean casItem(Object cmp, Object val) { - // assert cmp == null || cmp.getClass() != Node.class; - return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); - } - - /** - * Constructs a new node. Uses relaxed write because item can - * only be seen after publication via casNext. - */ - Node(Object item, boolean isData) { - UNSAFE.putObject(this, itemOffset, item); // relaxed write - this.isData = isData; - } - - /** - * Links node to itself to avoid garbage retention. Called - * only after CASing head field, so uses relaxed write. - */ - final void forgetNext() { - UNSAFE.putObject(this, nextOffset, this); - } - - /** - * Sets item to self and waiter to null, to avoid garbage - * retention after matching or cancelling. Uses relaxed writes - * because order is already constrained in the only calling - * contexts: item is forgotten only after volatile/atomic - * mechanics that extract items. Similarly, clearing waiter - * follows either CAS or return from park (if ever parked; - * else we don't care). - */ - final void forgetContents() { - UNSAFE.putObject(this, itemOffset, this); - UNSAFE.putObject(this, waiterOffset, null); - } - - /** - * Returns true if this node has been matched, including the - * case of artificial matches due to cancellation. - */ - final boolean isMatched() { - Object x = item; - return x == this || x == null == isData; - } - - /** - * Returns true if this is an unmatched request node. - */ - final boolean isUnmatchedRequest() { - return !isData && item == null; - } - - /** - * Returns true if a node with the given mode cannot be - * appended to this node because this node is unmatched and - * has opposite data mode. - */ - final boolean cannotPrecede(boolean haveData) { - boolean d = isData; - Object x; - return d != haveData && (x = item) != this && x != null == d; - } - - /** - * Tries to artificially match a data node -- used by remove. - */ - final boolean tryMatchData() { - // assert isData; - Object x = item; - if (x != null && x != this && casItem(x, null)) { - LockSupport.unpark(waiter); - return true; - } - return false; - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long itemOffset; - private static final long nextOffset; - private static final long waiterOffset; - static { - try { - UNSAFE = getUnsafe(); - Class k = Node.class; - itemOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("item")); - nextOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("next")); - waiterOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("waiter")); - } catch (Exception e) { - throw new Error(e); - } - } - } - - /** head of the queue; null until first enqueue */ - transient volatile Node head; - - /** tail of the queue; null until first append */ - private transient volatile Node tail; - - /** The number of apparent failures to unsplice removed nodes */ - private transient volatile int sweepVotes; - - // CAS methods for fields - private boolean casTail(Node cmp, Node val) { - return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); - } - - private boolean casHead(Node cmp, Node val) { - return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); - } - - private boolean casSweepVotes(int cmp, int val) { - return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); - } - - /* - * Possible values for "how" argument in xfer method. - */ - private static final int NOW = 0; // for untimed poll, tryTransfer - private static final int ASYNC = 1; // for offer, put, add - private static final int SYNC = 2; // for transfer, take - private static final int TIMED = 3; // for timed poll, tryTransfer - - @SuppressWarnings("unchecked") - static E cast(Object item) { - // assert item == null || item.getClass() != Node.class; - return (E) item; - } - - /** - * Implements all queuing methods. See above for explanation. - * - * @param e the item or null for take - * @param haveData true if this is a put, else a take - * @param how NOW, ASYNC, SYNC, or TIMED - * @param nanos timeout in nanosecs, used only if mode is TIMED - * @return an item if matched, else e - * @throws NullPointerException if haveData mode but e is null - */ - private E xfer(E e, boolean haveData, int how, long nanos) { - if (haveData && e == null) { - throw new NullPointerException(); - } - Node s = null; // the node to append, if needed - - retry: - for (;;) { // restart on append race - - for (Node h = head, p = h; p != null;) { // find & match first node - boolean isData = p.isData; - Object item = p.item; - if (item != p && item != null == isData) { // unmatched - if (isData == haveData) { - break; - } - if (p.casItem(item, e)) { // match - for (Node q = p; q != h;) { - Node n = q.next; // update by 2 unless singleton - if (head == h && casHead(h, n == null ? q : n)) { - h.forgetNext(); - break; - } // advance and retry - if ((h = head) == null || - (q = h.next) == null || !q.isMatched()) - { - break; // unless slack < 2 - } - } - LockSupport.unpark(p.waiter); - return LinkedTransferQueue.cast(item); - } - } - Node n = p.next; - p = p != n ? n : (h = head); // Use head if p offlist - } - - if (how != NOW) { // No matches available - if (s == null) { - s = new Node(e, haveData); - } - Node pred = tryAppend(s, haveData); - if (pred == null) - { - continue retry; // lost race vs opposite mode - } - if (how != ASYNC) { - return awaitMatch(s, pred, e, how == TIMED, nanos); - } - } - return e; // not waiting - } - } - - /** - * Tries to append node s as tail. - * - * @param s the node to append - * @param haveData true if appending in data mode - * @return null on failure due to losing race with append in - * different mode, else s's predecessor, or s itself if no - * predecessor - */ - private Node tryAppend(Node s, boolean haveData) { - for (Node t = tail, p = t;;) { // move p to last node and append - Node n, u; // temps for reads of next & tail - if (p == null && (p = head) == null) { - if (casHead(null, s)) - { - return s; // initialize - } - } - else if (p.cannotPrecede(haveData)) { - return null; // lost race vs opposite mode - } else if ((n = p.next) != null) { - p = p != t && t != (u = tail) ? (t = u) : // stale tail - p != n ? n : null; // restart if off list - } else if (!p.casNext(null, s)) { - p = p.next; // re-read on CAS failure - } else { - if (p != t) { // update if slack now >= 2 - while ((tail != t || !casTail(t, s)) && - (t = tail) != null && - (s = t.next) != null && // advance and retry - (s = s.next) != null && s != t) { - continue; - } - } - return p; - } - } - } - - /** - * Spins/yields/blocks until node s is matched or caller gives up. - * - * @param s the waiting node - * @param pred the predecessor of s, or s itself if it has no - * predecessor, or null if unknown (the null case does not occur - * in any current calls but may in possible future extensions) - * @param e the comparison value for checking match - * @param timed if true, wait only until timeout elapses - * @param nanos timeout in nanosecs, used only if timed is true - * @return matched item, or e if unmatched on interrupt or timeout - */ - private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { - long lastTime = timed ? System.nanoTime() : 0L; - Thread w = Thread.currentThread(); - int spins = -1; // initialized after first item and cancel checks - ThreadLocalRandom randomYields = null; // bound if needed - - for (;;) { - Object item = s.item; - if (item != e) { // matched - // assert item != s; - s.forgetContents(); // avoid garbage - return LinkedTransferQueue.cast(item); - } - if ((w.isInterrupted() || timed && nanos <= 0) && - s.casItem(e, s)) { // cancel - unsplice(pred, s); - return e; - } - - if (spins < 0) { // establish spins at/near front - if ((spins = spinsFor(pred, s.isData)) > 0) { - randomYields = ThreadLocalRandom.current(); - } - } - else if (spins > 0) { // spin - --spins; - if (randomYields.nextInt(CHAINED_SPINS) == 0) - { - Thread.yield(); // occasionally yield - } - } - else if (s.waiter == null) { - s.waiter = w; // request unpark then recheck - } - else if (timed) { - long now = System.nanoTime(); - if ((nanos -= now - lastTime) > 0) { - LockSupport.parkNanos(this, nanos); - } - lastTime = now; - } - else { - LockSupport.park(this); - } - } - } - - /** - * Returns spin/yield value for a node with given predecessor and - * data mode. See above for explanation. - */ - private static int spinsFor(Node pred, boolean haveData) { - if (MP && pred != null) { - if (pred.isData != haveData) { - return FRONT_SPINS + CHAINED_SPINS; - } - if (pred.isMatched()) { - return FRONT_SPINS; - } - if (pred.waiter == null) { - return CHAINED_SPINS; - } - } - return 0; - } - - /* -------------- Traversal methods -------------- */ - - /** - * Returns the successor of p, or the head node if p.next has been - * linked to self, which will only be true if traversing with a - * stale pointer that is now off the list. - */ - final Node succ(Node p) { - Node next = p.next; - return p == next ? head : next; - } - - /** - * Returns the first unmatched node of the given mode, or null if - * none. Used by methods isEmpty, hasWaitingConsumer. - */ - private Node firstOfMode(boolean isData) { - for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { - return p.isData == isData ? p : null; - } - } - return null; - } - - /** - * Returns the item in the first unmatched node with isData; or - * null if none. Used by peek. - */ - private E firstDataItem() { - for (Node p = head; p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p) { - return LinkedTransferQueue.cast(item); - } - } - else if (item == null) { - return null; - } - } - return null; - } - - /** - * Traverses and counts unmatched nodes of the given mode. - * Used by methods size and getWaitingConsumerCount. - */ - private int countOfMode(boolean data) { - int count = 0; - for (Node p = head; p != null; ) { - if (!p.isMatched()) { - if (p.isData != data) { - return 0; - } - if (++count == Integer.MAX_VALUE) { - break; - } - } - Node n = p.next; - if (n != p) { - p = n; - } else { - count = 0; - p = head; - } - } - return count; - } - - final class Itr implements Iterator { - private Node nextNode; // next node to return item for - private E nextItem; // the corresponding item - private Node lastRet; // last returned node, to support remove - private Node lastPred; // predecessor to unlink lastRet - - /** - * Moves to next node after prev, or first node if prev null. - */ - private void advance(Node prev) { - /* - * To track and avoid buildup of deleted nodes in the face - * of calls to both Queue.remove and Itr.remove, we must - * include variants of unsplice and sweep upon each - * advance: Upon Itr.remove, we may need to catch up links - * from lastPred, and upon other removes, we might need to - * skip ahead from stale nodes and unsplice deleted ones - * found while advancing. - */ - - Node r, b; // reset lastPred upon possible deletion of lastRet - if ((r = lastRet) != null && !r.isMatched()) { - lastPred = r; // next lastPred is old lastRet - } else if ((b = lastPred) == null || b.isMatched()) { - lastPred = null; // at start of list - } else { - Node s, n; // help with removal of lastPred.next - while ((s = b.next) != null && - s != b && s.isMatched() && - (n = s.next) != null && n != s) { - b.casNext(s, n); - } - } - - this.lastRet = prev; - - for (Node p = prev, s, n;;) { - s = p == null ? head : p.next; - if (s == null) { - break; - } else if (s == p) { - p = null; - continue; - } - Object item = s.item; - if (s.isData) { - if (item != null && item != s) { - nextItem = LinkedTransferQueue.cast(item); - nextNode = s; - return; - } - } - else if (item == null) { - break; - } - // assert s.isMatched(); - if (p == null) { - p = s; - } else if ((n = s.next) == null) { - break; - } else if (s == n) { - p = null; - } else { - p.casNext(s, n); - } - } - nextNode = null; - nextItem = null; - } - - Itr() { - advance(null); - } - - @Override - public final boolean hasNext() { - return nextNode != null; - } - - @Override - public final E next() { - Node p = nextNode; - if (p == null) { - throw new NoSuchElementException(); - } - E e = nextItem; - advance(p); - return e; - } - - @Override - public final void remove() { - final Node lastRet = this.lastRet; - if (lastRet == null) { - throw new IllegalStateException(); - } - this.lastRet = null; - if (lastRet.tryMatchData()) { - unsplice(lastPred, lastRet); - } - } - } - - /* -------------- Removal methods -------------- */ - - /** - * Unsplices (now or later) the given deleted/cancelled node with - * the given predecessor. - * - * @param pred a node that was at one time known to be the - * predecessor of s, or null or s itself if s is/was at head - * @param s the node to be unspliced - */ - final void unsplice(Node pred, Node s) { - s.forgetContents(); // forget unneeded fields - /* - * See above for rationale. Briefly: if pred still points to - * s, try to unlink s. If s cannot be unlinked, because it is - * trailing node or pred might be unlinked, and neither pred - * nor s are head or offlist, add to sweepVotes, and if enough - * votes have accumulated, sweep. - */ - if (pred != null && pred != s && pred.next == s) { - Node n = s.next; - if (n == null || - n != s && pred.casNext(s, n) && pred.isMatched()) { - for (;;) { // check if at, or could be, head - Node h = head; - if (h == pred || h == s || h == null) - { - return; // at head or list empty - } - if (!h.isMatched()) { - break; - } - Node hn = h.next; - if (hn == null) - { - return; // now empty - } - if (hn != h && casHead(h, hn)) - { - h.forgetNext(); // advance head - } - } - if (pred.next != pred && s.next != s) { // recheck if offlist - for (;;) { // sweep now if enough votes - int v = sweepVotes; - if (v < SWEEP_THRESHOLD) { - if (casSweepVotes(v, v + 1)) { - break; - } - } - else if (casSweepVotes(v, 0)) { - sweep(); - break; - } - } - } - } - } - } - - /** - * Unlinks matched (typically cancelled) nodes encountered in a - * traversal from head. - */ - private void sweep() { - for (Node p = head, s, n; p != null && (s = p.next) != null; ) { - if (!s.isMatched()) { - // Unmatched nodes are never self-linked - p = s; - } else if ((n = s.next) == null) { - break; - } else if (s == n) { - // No need to also check for p == s, since that implies s == n - p = head; - } else { - p.casNext(s, n); - } - } - } - - /** - * Main implementation of remove(Object) - */ - private boolean findAndRemove(Object e) { - if (e != null) { - for (Node pred = null, p = head; p != null; ) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p && e.equals(item) && - p.tryMatchData()) { - unsplice(pred, p); - return true; - } - } - else if (item == null) { - break; - } - pred = p; - if ((p = p.next) == pred) { // stale - pred = null; - p = head; - } - } - } - return false; - } - - - /** - * Creates an initially empty {@code LinkedTransferQueue}. - */ - public LinkedTransferQueue() { - } - - /** - * Creates a {@code LinkedTransferQueue} - * initially containing the elements of the given collection, - * added in traversal order of the collection's iterator. - * - * @param c the collection of elements to initially contain - * @throws NullPointerException if the specified collection or any - * of its elements are null - */ - public LinkedTransferQueue(Collection c) { - this(); - addAll(c); - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never block. - * - * @throws NullPointerException if the specified element is null - */ - @Override - public void put(E e) { - xfer(e, true, ASYNC, 0); - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never block or - * return {@code false}. - * - * @return {@code true} (as specified by - * {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit) - * BlockingQueue.offer}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean offer(E e, long timeout, TimeUnit unit) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never return {@code false}. - * - * @return {@code true} (as specified by {@link Queue#offer}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean offer(E e) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Inserts the specified element at the tail of this queue. - * As the queue is unbounded, this method will never throw - * {@link IllegalStateException} or return {@code false}. - * - * @return {@code true} (as specified by {@link Collection#add}) - * @throws NullPointerException if the specified element is null - */ - @Override - public boolean add(E e) { - xfer(e, true, ASYNC, 0); - return true; - } - - /** - * Transfers the element to a waiting consumer immediately, if possible. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * otherwise returning {@code false} without enqueuing the element. - * - * @throws NullPointerException if the specified element is null - */ - public boolean tryTransfer(E e) { - return xfer(e, true, NOW, 0) == null; - } - - /** - * Transfers the element to a consumer, waiting if necessary to do so. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * else inserts the specified element at the tail of this queue - * and waits until the element is received by a consumer. - * - * @throws NullPointerException if the specified element is null - */ - public void transfer(E e) throws InterruptedException { - if (xfer(e, true, SYNC, 0) != null) { - Thread.interrupted(); // failure possible only due to interrupt - throw new InterruptedException(); - } - } - - /** - * Transfers the element to a consumer if it is possible to do so - * before the timeout elapses. - * - *

More precisely, transfers the specified element immediately - * if there exists a consumer already waiting to receive it (in - * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), - * else inserts the specified element at the tail of this queue - * and waits until the element is received by a consumer, - * returning {@code false} if the specified wait time elapses - * before the element can be transferred. - * - * @throws NullPointerException if the specified element is null - */ - public boolean tryTransfer(E e, long timeout, TimeUnit unit) - throws InterruptedException { - if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) { - return true; - } - if (!Thread.interrupted()) { - return false; - } - throw new InterruptedException(); - } - - @Override - public E take() throws InterruptedException { - E e = xfer(null, false, SYNC, 0); - if (e != null) { - return e; - } - Thread.interrupted(); - throw new InterruptedException(); - } - - @Override - public E poll(long timeout, TimeUnit unit) throws InterruptedException { - E e = xfer(null, false, TIMED, unit.toNanos(timeout)); - if (e != null || !Thread.interrupted()) { - return e; - } - throw new InterruptedException(); - } - - @Override - public E poll() { - return xfer(null, false, NOW, 0); - } - - /** - * @throws NullPointerException {@inheritDoc} - * @throws IllegalArgumentException {@inheritDoc} - */ - @Override - public int drainTo(Collection c) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - for (E e; (e = poll()) != null;) { - c.add(e); - ++n; - } - return n; - } - - /** - * @throws NullPointerException {@inheritDoc} - * @throws IllegalArgumentException {@inheritDoc} - */ - @Override - public int drainTo(Collection c, int maxElements) { - if (c == null) { - throw new NullPointerException(); - } - if (c == this) { - throw new IllegalArgumentException(); - } - int n = 0; - for (E e; n < maxElements && (e = poll()) != null;) { - c.add(e); - ++n; - } - return n; - } - - /** - * Returns an iterator over the elements in this queue in proper sequence. - * The elements will be returned in order from first (head) to last (tail). - * - *

The returned iterator is a "weakly consistent" iterator that - * will never throw {@link java.util.ConcurrentModificationException - * ConcurrentModificationException}, and guarantees to traverse - * elements as they existed upon construction of the iterator, and - * may (but is not guaranteed to) reflect any modifications - * subsequent to construction. - * - * @return an iterator over the elements in this queue in proper sequence - */ - @Override - public Iterator iterator() { - return new Itr(); - } - - @Override - public E peek() { - return firstDataItem(); - } - - /** - * Returns {@code true} if this queue contains no elements. - * - * @return {@code true} if this queue contains no elements - */ - @Override - public boolean isEmpty() { - for (Node p = head; p != null; p = succ(p)) { - if (!p.isMatched()) { - return !p.isData; - } - } - return true; - } - - public boolean hasWaitingConsumer() { - return firstOfMode(false) != null; - } - - /** - * Returns the number of elements in this queue. If this queue - * contains more than {@code Integer.MAX_VALUE} elements, returns - * {@code Integer.MAX_VALUE}. - * - *

Beware that, unlike in most collections, this method is - * NOT a constant-time operation. Because of the - * asynchronous nature of these queues, determining the current - * number of elements requires an O(n) traversal. - * - * @return the number of elements in this queue - */ - @Override - public int size() { - return countOfMode(true); - } - - public int getWaitingConsumerCount() { - return countOfMode(false); - } - - /** - * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element {@code e} such - * that {@code o.equals(e)}, if this queue contains one or more such - * elements. - * Returns {@code true} if this queue contained the specified element - * (or equivalently, if this queue changed as a result of the call). - * - * @param o element to be removed from this queue, if present - * @return {@code true} if this queue changed as a result of the call - */ - @Override - public boolean remove(Object o) { - return findAndRemove(o); - } - - /** - * Returns {@code true} if this queue contains the specified element. - * More formally, returns {@code true} if and only if this queue contains - * at least one element {@code e} such that {@code o.equals(e)}. - * - * @param o object to be checked for containment in this queue - * @return {@code true} if this queue contains the specified element - */ - @Override - public boolean contains(Object o) { - if (o == null) { - return false; - } - for (Node p = head; p != null; p = succ(p)) { - Object item = p.item; - if (p.isData) { - if (item != null && item != p && o.equals(item)) { - return true; - } - } - else if (item == null) { - break; - } - } - return false; - } - - /** - * Always returns {@code Integer.MAX_VALUE} because a - * {@code LinkedTransferQueue} is not capacity constrained. - * - * @return {@code Integer.MAX_VALUE} (as specified by - * {@link java.util.concurrent.BlockingQueue#remainingCapacity() - * BlockingQueue.remainingCapacity}) - */ - @Override - public int remainingCapacity() { - return Integer.MAX_VALUE; - } - - /** - * Saves the state to a stream (that is, serializes it). - * - * @serialData All of the elements (each an {@code E}) in - * the proper order, followed by a null - * @param s the stream - */ - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - for (E e : this) { - s.writeObject(e); - } - // Use trailing null as sentinel - s.writeObject(null); - } - - /** - * Reconstitutes the Queue instance from a stream (that is, - * deserializes it). - * - * @param s the stream - */ - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - for (;;) { - E item = (E) s.readObject(); - if (item == null) { - break; - } else { - offer(item); - } - } - } - - // Unsafe mechanics - - private static final sun.misc.Unsafe UNSAFE; - private static final long headOffset; - private static final long tailOffset; - private static final long sweepVotesOffset; - static { - try { - UNSAFE = getUnsafe(); - Class k = LinkedTransferQueue.class; - headOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("head")); - tailOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("tail")); - sweepVotesOffset = UNSAFE.objectFieldOffset - (k.getDeclaredField("sweepVotes")); - } catch (Exception e) { - throw new Error(e); - } - } - - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ - static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - @Override - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } - } - -} \ No newline at end of file diff --git a/common/src/main/java/io/netty/util/internal/QueueFactory.java b/common/src/main/java/io/netty/util/internal/QueueFactory.java deleted file mode 100644 index 62382e2250..0000000000 --- a/common/src/main/java/io/netty/util/internal/QueueFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.internal; - -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; - -import java.util.concurrent.BlockingQueue; - -/** - * This factory should be used to create the "optimal" {@link BlockingQueue} - * instance for the running JVM. - */ -public final class QueueFactory { - - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(QueueFactory.class); - - private static final boolean USE_LTQ; - - static { - boolean useLTQ = false; - try { - if (DetectionUtil.hasUnsafe()) { - new LinkedTransferQueue(); - useLTQ = true; - } - logger.debug( - "No access to the Unsafe - using " + - LegacyLinkedTransferQueue.class.getSimpleName() + " instead."); - } catch (Throwable t) { - logger.debug( - "Failed to initialize a " + LinkedTransferQueue.class.getSimpleName() + " - " + - "using " + LegacyLinkedTransferQueue.class.getSimpleName() + " instead.", t); - } - - USE_LTQ = useLTQ; - } - - /** - * Create a new unbound {@link BlockingQueue} - * - * @return queue the {@link BlockingQueue} implementation - */ - public static BlockingQueue createQueue() { - if (USE_LTQ) { - return new LinkedTransferQueue(); - } else { - return new LegacyLinkedTransferQueue(); - } - } - - private QueueFactory() { - // only use static methods! - } -}