diff --git a/src/main/java/org/apache/cassandra/service/CacheService.java b/src/main/java/org/apache/cassandra/service/CacheService.java index ceea0ac..8c1cb06 100644 --- a/src/main/java/org/apache/cassandra/service/CacheService.java +++ b/src/main/java/org/apache/cassandra/service/CacheService.java @@ -24,22 +24,19 @@ package org.apache.cassandra.service; -import java.lang.management.ManagementFactory; import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; -import javax.management.MBeanServer; -import javax.management.ObjectName; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import org.apache.cassandra.metrics.CacheMetrics; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.MetricsMBean; -public class CacheService implements CacheServiceMBean { - private static final java.util.logging.Logger logger = java.util.logging.Logger - .getLogger(CacheService.class.getName()); - private APIClient c = new APIClient(); +public class CacheService extends MetricsMBean implements CacheServiceMBean { + private static final Logger logger = Logger.getLogger(CacheService.class.getName()); public void log(String str) { logger.finest(str); @@ -47,33 +44,15 @@ public class CacheService implements CacheServiceMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Caches"; - public final CacheMetrics keyCache; - public final CacheMetrics rowCache; - public final CacheMetrics counterCache; - public final static CacheService instance = new CacheService(); - - public static CacheService getInstance() { - return instance; - } - - private CacheService() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - keyCache = new CacheMetrics("KeyCache", null); - rowCache = new CacheMetrics("RowCache", "row"); - counterCache = new CacheMetrics("CounterCache", null); + public CacheService(APIClient client) { + super(MBEAN_NAME, client, new CacheMetrics("KeyCache", "key"), new CacheMetrics("RowCache", "row"), + new CacheMetrics("CounterCache", "counter")); } @Override public int getRowCacheSavePeriodInSeconds() { log(" getRowCacheSavePeriodInSeconds()"); - return c.getIntValue("cache_service/row_cache_save_period"); + return client.getIntValue("cache_service/row_cache_save_period"); } @Override @@ -81,13 +60,13 @@ public class CacheService implements CacheServiceMBean { log(" setRowCacheSavePeriodInSeconds(int rcspis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("period", Integer.toString(rcspis)); - c.post("cache_service/row_cache_save_period", queryParams); + client.post("cache_service/row_cache_save_period", queryParams); } @Override public int getKeyCacheSavePeriodInSeconds() { log(" getKeyCacheSavePeriodInSeconds()"); - return c.getIntValue("cache_service/key_cache_save_period"); + return client.getIntValue("cache_service/key_cache_save_period"); } @Override @@ -95,13 +74,13 @@ public class CacheService implements CacheServiceMBean { log(" setKeyCacheSavePeriodInSeconds(int kcspis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("period", Integer.toString(kcspis)); - c.post("cache_service/key_cache_save_period", queryParams); + client.post("cache_service/key_cache_save_period", queryParams); } @Override public int getCounterCacheSavePeriodInSeconds() { log(" getCounterCacheSavePeriodInSeconds()"); - return c.getIntValue("cache_service/counter_cache_save_period"); + return client.getIntValue("cache_service/counter_cache_save_period"); } @Override @@ -109,13 +88,13 @@ public class CacheService implements CacheServiceMBean { log(" setCounterCacheSavePeriodInSeconds(int ccspis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("ccspis", Integer.toString(ccspis)); - c.post("cache_service/counter_cache_save_period", queryParams); + client.post("cache_service/counter_cache_save_period", queryParams); } @Override public int getRowCacheKeysToSave() { log(" getRowCacheKeysToSave()"); - return c.getIntValue("cache_service/row_cache_keys_to_save"); + return client.getIntValue("cache_service/row_cache_keys_to_save"); } @Override @@ -123,13 +102,13 @@ public class CacheService implements CacheServiceMBean { log(" setRowCacheKeysToSave(int rckts)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("rckts", Integer.toString(rckts)); - c.post("cache_service/row_cache_keys_to_save", queryParams); + client.post("cache_service/row_cache_keys_to_save", queryParams); } @Override public int getKeyCacheKeysToSave() { log(" getKeyCacheKeysToSave()"); - return c.getIntValue("cache_service/key_cache_keys_to_save"); + return client.getIntValue("cache_service/key_cache_keys_to_save"); } @Override @@ -137,13 +116,13 @@ public class CacheService implements CacheServiceMBean { log(" setKeyCacheKeysToSave(int kckts)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("kckts", Integer.toString(kckts)); - c.post("cache_service/key_cache_keys_to_save", queryParams); + client.post("cache_service/key_cache_keys_to_save", queryParams); } @Override public int getCounterCacheKeysToSave() { log(" getCounterCacheKeysToSave()"); - return c.getIntValue("cache_service/counter_cache_keys_to_save"); + return client.getIntValue("cache_service/counter_cache_keys_to_save"); } @Override @@ -151,7 +130,7 @@ public class CacheService implements CacheServiceMBean { log(" setCounterCacheKeysToSave(int cckts)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("cckts", Integer.toString(cckts)); - c.post("cache_service/counter_cache_keys_to_save", queryParams); + client.post("cache_service/counter_cache_keys_to_save", queryParams); } /** @@ -160,7 +139,7 @@ public class CacheService implements CacheServiceMBean { @Override public void invalidateKeyCache() { log(" invalidateKeyCache()"); - c.post("cache_service/invalidate_key_cache"); + client.post("cache_service/invalidate_key_cache"); } /** @@ -169,13 +148,13 @@ public class CacheService implements CacheServiceMBean { @Override public void invalidateRowCache() { log(" invalidateRowCache()"); - c.post("cache_service/invalidate_row_cache"); + client.post("cache_service/invalidate_row_cache"); } @Override public void invalidateCounterCache() { log(" invalidateCounterCache()"); - c.post("cache_service/invalidate_counter_cache"); + client.post("cache_service/invalidate_counter_cache"); } @Override @@ -183,7 +162,7 @@ public class CacheService implements CacheServiceMBean { log(" setRowCacheCapacityInMB(long capacity)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("capacity", Long.toString(capacity)); - c.post("cache_service/row_cache_capacity", queryParams); + client.post("cache_service/row_cache_capacity", queryParams); } @Override @@ -191,7 +170,7 @@ public class CacheService implements CacheServiceMBean { log(" setKeyCacheCapacityInMB(long capacity)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("capacity", Long.toString(capacity)); - c.post("cache_service/key_cache_capacity", queryParams); + client.post("cache_service/key_cache_capacity", queryParams); } @Override @@ -199,7 +178,7 @@ public class CacheService implements CacheServiceMBean { log(" setCounterCacheCapacityInMB(long capacity)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("capacity", Long.toString(capacity)); - c.post("cache_service/counter_cache_capacity_in_mb", queryParams); + client.post("cache_service/counter_cache_capacity_in_mb", queryParams); } /** @@ -216,6 +195,6 @@ public class CacheService implements CacheServiceMBean { @Override public void saveCaches() throws ExecutionException, InterruptedException { log(" saveCaches() throws ExecutionException, InterruptedException"); - c.post("cache_service/save_caches"); + client.post("cache_service/save_caches"); } } diff --git a/src/main/java/org/apache/cassandra/service/GCInspector.java b/src/main/java/org/apache/cassandra/service/GCInspector.java index 9b50316..de943c2 100644 --- a/src/main/java/org/apache/cassandra/service/GCInspector.java +++ b/src/main/java/org/apache/cassandra/service/GCInspector.java @@ -24,259 +24,18 @@ package org.apache.cassandra.service; -import java.lang.management.GarbageCollectorMXBean; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryUsage; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.APIMBean; -import javax.management.MBeanServer; -import javax.management.Notification; -import javax.management.NotificationListener; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sun.management.GarbageCollectionNotificationInfo; -import com.sun.management.GcInfo; - -@SuppressWarnings("restriction") -public class GCInspector implements NotificationListener, GCInspectorMXBean -{ +public class GCInspector extends APIMBean implements GCInspectorMXBean { public static final String MBEAN_NAME = "org.apache.cassandra.service:type=GCInspector"; - private static final Logger logger = LoggerFactory.getLogger(GCInspector.class); - final static long MIN_LOG_DURATION = 200; - final static long MIN_LOG_DURATION_TPSTATS = 1000; - static final class State - { - final double maxRealTimeElapsed; - final double totalRealTimeElapsed; - final double sumSquaresRealTimeElapsed; - final double totalBytesReclaimed; - final double count; - final long startNanos; - - State(double extraElapsed, double extraBytes, State prev) - { - this.totalRealTimeElapsed = prev.totalRealTimeElapsed + extraElapsed; - this.totalBytesReclaimed = prev.totalBytesReclaimed + extraBytes; - this.sumSquaresRealTimeElapsed = prev.sumSquaresRealTimeElapsed + (extraElapsed * extraElapsed); - this.startNanos = prev.startNanos; - this.count = prev.count + 1; - this.maxRealTimeElapsed = Math.max(prev.maxRealTimeElapsed, extraElapsed); - } - - State() - { - count = maxRealTimeElapsed = sumSquaresRealTimeElapsed = totalRealTimeElapsed = totalBytesReclaimed = 0; - startNanos = System.nanoTime(); - } + public GCInspector(APIClient client) { + super(client); } - static final class GCState - { - final GarbageCollectorMXBean gcBean; - final boolean assumeGCIsPartiallyConcurrent; - final boolean assumeGCIsOldGen; - private String[] keys; - long lastGcTotalDuration = 0; - - - GCState(GarbageCollectorMXBean gcBean, boolean assumeGCIsPartiallyConcurrent, boolean assumeGCIsOldGen) - { - this.gcBean = gcBean; - this.assumeGCIsPartiallyConcurrent = assumeGCIsPartiallyConcurrent; - this.assumeGCIsOldGen = assumeGCIsOldGen; - } - - String[] keys(GarbageCollectionNotificationInfo info) - { - if (keys != null) - return keys; - - keys = info.getGcInfo().getMemoryUsageBeforeGc().keySet().toArray(new String[0]); - Arrays.sort(keys); - - return keys; - } - } - - final AtomicReference state = new AtomicReference<>(new State()); - - final Map gcStates = new HashMap<>(); - - public GCInspector() - { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - - try - { - ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*"); - for (ObjectName name : mbs.queryNames(gcName, null)) - { - GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class); - gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc))); - } - - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - public static void register() throws Exception - { - GCInspector inspector = new GCInspector(); - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*"); - for (ObjectName name : server.queryNames(gcName, null)) - { - server.addNotificationListener(name, inspector, null, null); - } - } - - /* - * Assume that a GC type is at least partially concurrent and so a side channel method - * should be used to calculate application stopped time due to the GC. - * - * If the GC isn't recognized then assume that is concurrent and we need to do our own calculation - * via the the side channel. - */ - private static boolean assumeGCIsPartiallyConcurrent(GarbageCollectorMXBean gc) - { - switch (gc.getName()) - { - //First two are from the serial collector - case "Copy": - case "MarkSweepCompact": - //Parallel collector - case "PS MarkSweep": - case "PS Scavenge": - case "G1 Young Generation": - //CMS young generation collector - case "ParNew": - return false; - case "ConcurrentMarkSweep": - case "G1 Old Generation": - return true; - default: - //Assume possibly concurrent if unsure - return true; - } - } - - /* - * Assume that a GC type is an old generation collection so SSTableDeletingTask.rescheduleFailedTasks() - * should be invoked. - * - * Defaults to not invoking SSTableDeletingTask.rescheduleFailedTasks() on unrecognized GC names - */ - private static boolean assumeGCIsOldGen(GarbageCollectorMXBean gc) - { - switch (gc.getName()) - { - case "Copy": - case "PS Scavenge": - case "G1 Young Generation": - case "ParNew": - return false; - case "MarkSweepCompact": - case "PS MarkSweep": - case "ConcurrentMarkSweep": - case "G1 Old Generation": - return true; - default: - //Assume not old gen otherwise, don't call - //SSTableDeletingTask.rescheduleFailedTasks() - return false; - } - } - - public void handleNotification(final Notification notification, final Object handback) - { - String type = notification.getType(); - if (type.equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION)) - { - // retrieve the garbage collection notification information - CompositeData cd = (CompositeData) notification.getUserData(); - GarbageCollectionNotificationInfo info = GarbageCollectionNotificationInfo.from(cd); - String gcName = info.getGcName(); - GcInfo gcInfo = info.getGcInfo(); - - long duration = gcInfo.getDuration(); - - /* - * The duration supplied in the notification info includes more than just - * application stopped time for concurrent GCs. Try and do a better job coming up with a good stopped time - * value by asking for and tracking cumulative time spent blocked in GC. - */ - GCState gcState = gcStates.get(gcName); - if (gcState.assumeGCIsPartiallyConcurrent) - { - long previousTotal = gcState.lastGcTotalDuration; - long total = gcState.gcBean.getCollectionTime(); - gcState.lastGcTotalDuration = total; - duration = total - previousTotal; // may be zero for a really fast collection - } - - StringBuilder sb = new StringBuilder(); - sb.append(info.getGcName()).append(" GC in ").append(duration).append("ms. "); - long bytes = 0; - Map beforeMemoryUsage = gcInfo.getMemoryUsageBeforeGc(); - Map afterMemoryUsage = gcInfo.getMemoryUsageAfterGc(); - for (String key : gcState.keys(info)) - { - MemoryUsage before = beforeMemoryUsage.get(key); - MemoryUsage after = afterMemoryUsage.get(key); - if (after != null && after.getUsed() != before.getUsed()) - { - sb.append(key).append(": ").append(before.getUsed()); - sb.append(" -> "); - sb.append(after.getUsed()); - if (!key.equals(gcState.keys[gcState.keys.length - 1])) - sb.append("; "); - bytes += before.getUsed() - after.getUsed(); - } - } - - while (true) - { - State prev = state.get(); - if (state.compareAndSet(prev, new State(duration, bytes, prev))) - break; - } - - String st = sb.toString(); - if (duration > MIN_LOG_DURATION) - logger.trace(st); - else if (logger.isDebugEnabled()) - logger.debug(st); - } - } - - public State getTotalSinceLastCheck() - { - return state.getAndSet(new State()); - } - - public double[] getAndResetStats() - { - State state = getTotalSinceLastCheck(); - double[] r = new double[6]; - r[0] = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - state.startNanos); - r[1] = state.maxRealTimeElapsed; - r[2] = state.totalRealTimeElapsed; - r[3] = state.sumSquaresRealTimeElapsed; - r[4] = state.totalBytesReclaimed; - r[5] = state.count; - return r; + @Override + public double[] getAndResetStats() { + return new double[6]; } } diff --git a/src/main/java/org/apache/cassandra/service/StorageProxy.java b/src/main/java/org/apache/cassandra/service/StorageProxy.java index 04e41e2..8a140c7 100644 --- a/src/main/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/main/java/org/apache/cassandra/service/StorageProxy.java @@ -25,67 +25,54 @@ package org.apache.cassandra.service; import static java.util.Collections.emptySet; -import java.lang.management.ManagementFactory; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.logging.Logger; -import javax.management.MBeanServer; -import javax.management.ObjectName; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; +import org.apache.cassandra.metrics.CASClientRequestMetrics; +import org.apache.cassandra.metrics.ClientRequestMetrics; + import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.MetricsMBean; -public class StorageProxy implements StorageProxyMBean { +public class StorageProxy extends MetricsMBean implements StorageProxyMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy"; - private static final java.util.logging.Logger logger = java.util.logging.Logger - .getLogger(StorageProxy.class.getName()); - - private APIClient c = new APIClient(); + private static final Logger logger = Logger.getLogger(StorageProxy.class.getName()); public void log(String str) { logger.finest(str); } - private static final StorageProxy instance = new StorageProxy(); - - public static StorageProxy getInstance() { - return instance; - } - public static final String UNREACHABLE = "UNREACHABLE"; - private StorageProxy() { - } - - static { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try { - mbs.registerMBean(instance, new ObjectName(MBEAN_NAME)); - } catch (Exception e) { - throw new RuntimeException(e); - } - + public StorageProxy(APIClient client) { + super(MBEAN_NAME, client, new ClientRequestMetrics("Read", "storage_proxy/metrics/read"), + new ClientRequestMetrics("RangeSlice", "/storage_proxy/metrics/range"), + new ClientRequestMetrics("Write", "storage_proxy/metrics/write"), + new CASClientRequestMetrics("CASWrite", "storage_proxy/metrics/cas_write"), + new CASClientRequestMetrics("CASRead", "storage_proxy/metrics/cas_read")); } @Override public long getTotalHints() { log(" getTotalHints()"); - return c.getLongValue("storage_proxy/total_hints"); + return client.getLongValue("storage_proxy/total_hints"); } @Override public boolean getHintedHandoffEnabled() { log(" getHintedHandoffEnabled()"); - return c.getBooleanValue("storage_proxy/hinted_handoff_enabled"); + return client.getBooleanValue("storage_proxy/hinted_handoff_enabled"); } @Override public Set getHintedHandoffEnabledByDC() { log(" getHintedHandoffEnabledByDC()"); - return c.getSetStringValue( - "storage_proxy/hinted_handoff_enabled_by_dc"); + return client.getSetStringValue("storage_proxy/hinted_handoff_enabled_by_dc"); } @Override @@ -93,7 +80,7 @@ public class StorageProxy implements StorageProxyMBean { log(" setHintedHandoffEnabled(boolean b)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("enable", Boolean.toString(b)); - c.post("storage_proxy/hinted_handoff_enabled", queryParams); + client.post("storage_proxy/hinted_handoff_enabled", queryParams); } @Override @@ -101,13 +88,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setHintedHandoffEnabledByDCList(String dcs)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("dcs", dcs); - c.post("storage_proxy/hinted_handoff_enabled_by_dc_list"); + client.post("storage_proxy/hinted_handoff_enabled_by_dc_list"); } @Override public int getMaxHintWindow() { log(" getMaxHintWindow()"); - return c.getIntValue("storage_proxy/max_hint_window"); + return client.getIntValue("storage_proxy/max_hint_window"); } @Override @@ -115,13 +102,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setMaxHintWindow(int ms)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("ms", Integer.toString(ms)); - c.post("storage_proxy/max_hint_window", queryParams); + client.post("storage_proxy/max_hint_window", queryParams); } @Override public int getMaxHintsInProgress() { log(" getMaxHintsInProgress()"); - return c.getIntValue("storage_proxy/max_hints_in_progress"); + return client.getIntValue("storage_proxy/max_hints_in_progress"); } @Override @@ -129,19 +116,19 @@ public class StorageProxy implements StorageProxyMBean { log(" setMaxHintsInProgress(int qs)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("qs", Integer.toString(qs)); - c.post("storage_proxy/max_hints_in_progress", queryParams); + client.post("storage_proxy/max_hints_in_progress", queryParams); } @Override public int getHintsInProgress() { log(" getHintsInProgress()"); - return c.getIntValue("storage_proxy/hints_in_progress"); + return client.getIntValue("storage_proxy/hints_in_progress"); } @Override public Long getRpcTimeout() { log(" getRpcTimeout()"); - return c.getLongValue("storage_proxy/rpc_timeout"); + return client.getLongValue("storage_proxy/rpc_timeout"); } @Override @@ -149,13 +136,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/rpc_timeout", queryParams); + client.post("storage_proxy/rpc_timeout", queryParams); } @Override public Long getReadRpcTimeout() { log(" getReadRpcTimeout()"); - return c.getLongValue("storage_proxy/read_rpc_timeout"); + return client.getLongValue("storage_proxy/read_rpc_timeout"); } @Override @@ -163,13 +150,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setReadRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/read_rpc_timeout", queryParams); + client.post("storage_proxy/read_rpc_timeout", queryParams); } @Override public Long getWriteRpcTimeout() { log(" getWriteRpcTimeout()"); - return c.getLongValue("storage_proxy/write_rpc_timeout"); + return client.getLongValue("storage_proxy/write_rpc_timeout"); } @Override @@ -177,13 +164,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setWriteRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/write_rpc_timeout", queryParams); + client.post("storage_proxy/write_rpc_timeout", queryParams); } @Override public Long getCounterWriteRpcTimeout() { log(" getCounterWriteRpcTimeout()"); - return c.getLongValue("storage_proxy/counter_write_rpc_timeout"); + return client.getLongValue("storage_proxy/counter_write_rpc_timeout"); } @Override @@ -191,13 +178,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setCounterWriteRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/counter_write_rpc_timeout", queryParams); + client.post("storage_proxy/counter_write_rpc_timeout", queryParams); } @Override public Long getCasContentionTimeout() { log(" getCasContentionTimeout()"); - return c.getLongValue("storage_proxy/cas_contention_timeout"); + return client.getLongValue("storage_proxy/cas_contention_timeout"); } @Override @@ -205,13 +192,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setCasContentionTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/cas_contention_timeout", queryParams); + client.post("storage_proxy/cas_contention_timeout", queryParams); } @Override public Long getRangeRpcTimeout() { log(" getRangeRpcTimeout()"); - return c.getLongValue("storage_proxy/range_rpc_timeout"); + return client.getLongValue("storage_proxy/range_rpc_timeout"); } @Override @@ -219,13 +206,13 @@ public class StorageProxy implements StorageProxyMBean { log(" setRangeRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/range_rpc_timeout", queryParams); + client.post("storage_proxy/range_rpc_timeout", queryParams); } @Override public Long getTruncateRpcTimeout() { log(" getTruncateRpcTimeout()"); - return c.getLongValue("storage_proxy/truncate_rpc_timeout"); + return client.getLongValue("storage_proxy/truncate_rpc_timeout"); } @Override @@ -233,43 +220,42 @@ public class StorageProxy implements StorageProxyMBean { log(" setTruncateRpcTimeout(Long timeoutInMillis)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("timeout", Long.toString(timeoutInMillis)); - c.post("storage_proxy/truncate_rpc_timeout", queryParams); + client.post("storage_proxy/truncate_rpc_timeout", queryParams); } @Override public void reloadTriggerClasses() { log(" reloadTriggerClasses()"); - c.post("storage_proxy/reload_trigger_classes"); + client.post("storage_proxy/reload_trigger_classes"); } @Override public long getReadRepairAttempted() { log(" getReadRepairAttempted()"); - return c.getLongValue("storage_proxy/read_repair_attempted"); + return client.getLongValue("storage_proxy/read_repair_attempted"); } @Override public long getReadRepairRepairedBlocking() { log(" getReadRepairRepairedBlocking()"); - return c.getLongValue("storage_proxy/read_repair_repaired_blocking"); + return client.getLongValue("storage_proxy/read_repair_repaired_blocking"); } @Override public long getReadRepairRepairedBackground() { log(" getReadRepairRepairedBackground()"); - return c.getLongValue("storage_proxy/read_repair_repaired_background"); + return client.getLongValue("storage_proxy/read_repair_repaired_background"); } /** Returns each live node's schema version */ @Override public Map> getSchemaVersions() { log(" getSchemaVersions()"); - return c.getMapStringListStrValue("storage_proxy/schema_versions"); + return client.getMapStringListStrValue("storage_proxy/schema_versions"); } @Override - public void setNativeTransportMaxConcurrentConnections( - Long nativeTransportMaxConcurrentConnections) { + public void setNativeTransportMaxConcurrentConnections(Long nativeTransportMaxConcurrentConnections) { // TODO Auto-generated method stub log(" setNativeTransportMaxConcurrentConnections()"); @@ -279,21 +265,21 @@ public class StorageProxy implements StorageProxyMBean { public Long getNativeTransportMaxConcurrentConnections() { // TODO Auto-generated method stub log(" getNativeTransportMaxConcurrentConnections()"); - return c.getLongValue(""); + return client.getLongValue(""); } @Override public void enableHintsForDC(String dc) { - // TODO if/when scylla uses hints + // TODO if/when scylla uses hints log(" enableHintsForDC()"); } @Override public void disableHintsForDC(String dc) { - // TODO if/when scylla uses hints + // TODO if/when scylla uses hints log(" disableHintsForDC()"); } - + @Override public Set getHintedHandoffDisabledDCs() { // TODO if/when scylla uses hints diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index a10880c..7b7aa7e 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -23,7 +23,6 @@ package org.apache.cassandra.service; import java.io.IOException; -import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -42,23 +41,27 @@ import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; import javax.json.JsonArray; import javax.json.JsonObject; -import javax.management.MBeanServer; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; import javax.management.Notification; +import javax.management.NotificationBroadcaster; import javax.management.NotificationBroadcasterSupport; -import javax.management.ObjectName; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; import javax.management.openmbean.TabularData; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.repair.RepairParallelism; -import org.apache.cassandra.streaming.StreamManager; import com.google.common.base.Joiner; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.MetricsMBean; import com.scylladb.jmx.utils.FileUtils; /** @@ -66,42 +69,46 @@ import com.scylladb.jmx.utils.FileUtils; * space. This token gets gossiped around. This class will also maintain * histograms of the load information of other nodes in the cluster. */ -public class StorageService extends NotificationBroadcasterSupport - implements StorageServiceMBean { - private static final java.util.logging.Logger logger = java.util.logging.Logger - .getLogger(StorageService.class.getName()); +public class StorageService extends MetricsMBean implements StorageServiceMBean, NotificationBroadcaster { + private static final Logger logger = Logger.getLogger(StorageService.class.getName()); + private static final Timer timer = new Timer("Storage Service Repair", true); - private APIClient c = new APIClient(); - private static Timer timer = new Timer("Storage Service Repair"); - @SuppressWarnings("unused") - private StorageMetrics metrics = new StorageMetrics(); + private final NotificationBroadcasterSupport notificationBroadcasterSupport = new NotificationBroadcasterSupport(); - public static final StorageService instance = new StorageService(); - - public static StorageService getInstance() { - return instance; + @Override + public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) { + notificationBroadcasterSupport.addNotificationListener(listener, filter, handback); } - public static enum RepairStatus - { + @Override + public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException { + notificationBroadcasterSupport.removeNotificationListener(listener); + } + + @Override + public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) + throws ListenerNotFoundException { + notificationBroadcasterSupport.removeNotificationListener(listener, filter, handback); + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() { + return notificationBroadcasterSupport.getNotificationInfo(); + } + + public void sendNotification(Notification notification) { + notificationBroadcasterSupport.sendNotification(notification); + } + + public static enum RepairStatus { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED } /* JMX notification serial number counter */ private final AtomicLong notificationSerialNumber = new AtomicLong(); - private final ObjectName jmxObjectName; - - public StorageService() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try { - jmxObjectName = new ObjectName( - "org.apache.cassandra.db:type=StorageService"); - mbs.registerMBean(this, jmxObjectName); - mbs.registerMBean(StreamManager.getInstance(), new ObjectName(StreamManager.OBJECT_NAME)); - } catch (Exception e) { - throw new RuntimeException(e); - } + public StorageService(APIClient client) { + super("org.apache.cassandra.db:type=StorageService", client, new StorageMetrics()); } @@ -118,7 +125,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getLiveNodes() { log(" getLiveNodes()"); - return c.getListStrValue("/gossiper/endpoint/live"); + return client.getListStrValue("/gossiper/endpoint/live"); } /** @@ -130,7 +137,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getUnreachableNodes() { log(" getUnreachableNodes()"); - return c.getListStrValue("/gossiper/endpoint/down"); + return client.getListStrValue("/gossiper/endpoint/down"); } /** @@ -141,7 +148,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getJoiningNodes() { log(" getJoiningNodes()"); - return c.getListStrValue("/storage_service/nodes/joining"); + return client.getListStrValue("/storage_service/nodes/joining"); } /** @@ -152,7 +159,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getLeavingNodes() { log(" getLeavingNodes()"); - return c.getListStrValue("/storage_service/nodes/leaving"); + return client.getListStrValue("/storage_service/nodes/leaving"); } /** @@ -163,7 +170,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getMovingNodes() { log(" getMovingNodes()"); - return c.getListStrValue("/storage_service/nodes/moving"); + return client.getListStrValue("/storage_service/nodes/moving"); } /** @@ -193,7 +200,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List getTokens(String endpoint) throws UnknownHostException { log(" getTokens(String endpoint) throws UnknownHostException"); - return c.getListStrValue("/storage_service/tokens/" + endpoint); + return client.getListStrValue("/storage_service/tokens/" + endpoint); } /** @@ -204,7 +211,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getReleaseVersion() { log(" getReleaseVersion()"); - return c.getStringValue("/storage_service/release_version"); + return client.getStringValue("/storage_service/release_version"); } /** @@ -215,7 +222,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getSchemaVersion() { log(" getSchemaVersion()"); - return c.getStringValue("/storage_service/schema_version"); + return client.getStringValue("/storage_service/schema_version"); } /** @@ -226,7 +233,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String[] getAllDataFileLocations() { log(" getAllDataFileLocations()"); - return c.getStringArrValue("/storage_service/data_file/locations"); + return client.getStringArrValue("/storage_service/data_file/locations"); } /** @@ -237,7 +244,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getCommitLogLocation() { log(" getCommitLogLocation()"); - return c.getStringValue("/storage_service/commitlog"); + return client.getStringValue("/storage_service/commitlog"); } /** @@ -248,7 +255,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getSavedCachesLocation() { log(" getSavedCachesLocation()"); - return c.getStringValue("/storage_service/saved_caches/location"); + return client.getStringValue("/storage_service/saved_caches/location"); } /** @@ -258,10 +265,9 @@ public class StorageService extends NotificationBroadcasterSupport * @return mapping of ranges to end points */ @Override - public Map, List> getRangeToEndpointMap( - String keyspace) { + public Map, List> getRangeToEndpointMap(String keyspace) { log(" getRangeToEndpointMap(String keyspace)"); - return c.getMapListStrValue("/storage_service/range/" + keyspace); + return client.getMapListStrValue("/storage_service/range/" + keyspace); } /** @@ -271,13 +277,11 @@ public class StorageService extends NotificationBroadcasterSupport * @return mapping of ranges to rpc addresses */ @Override - public Map, List> getRangeToRpcaddressMap( - String keyspace) { + public Map, List> getRangeToRpcaddressMap(String keyspace) { log(" getRangeToRpcaddressMap(String keyspace)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("rpc", "true"); - return c.getMapListStrValue("/storage_service/range/" + keyspace, - queryParams); + return client.getMapListStrValue("/storage_service/range/" + keyspace, queryParams); } /** @@ -293,7 +297,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List describeRingJMX(String keyspace) throws IOException { log(" describeRingJMX(String keyspace) throws IOException"); - JsonArray arr = c.getJsonArray("/storage_service/describe_ring/" + keyspace); + JsonArray arr = client.getJsonArray("/storage_service/describe_ring/" + keyspace); List res = new ArrayList(); for (int i = 0; i < arr.size(); i++) { @@ -352,11 +356,9 @@ public class StorageService extends NotificationBroadcasterSupport * @return a map of pending ranges to endpoints */ @Override - public Map, List> getPendingRangeToEndpointMap( - String keyspace) { + public Map, List> getPendingRangeToEndpointMap(String keyspace) { log(" getPendingRangeToEndpointMap(String keyspace)"); - return c.getMapListStrValue( - "/storage_service/pending_range/" + keyspace); + return client.getMapListStrValue("/storage_service/pending_range/" + keyspace); } /** @@ -367,13 +369,13 @@ public class StorageService extends NotificationBroadcasterSupport @Override public Map getTokenToEndpointMap() { log(" getTokenToEndpointMap()"); - Map mapInetAddress = c.getMapStrValue("/storage_service/tokens_endpoint"); - // in order to preserve tokens in ascending order, we use LinkedHashMap here + Map mapInetAddress = client.getMapStrValue("/storage_service/tokens_endpoint"); + // in order to preserve tokens in ascending order, we use LinkedHashMap + // here Map mapString = new LinkedHashMap<>(mapInetAddress.size()); List tokens = new ArrayList<>(mapInetAddress.keySet()); Collections.sort(tokens); - for (String token : tokens) - { + for (String token : tokens) { mapString.put(token, mapInetAddress.get(token)); } return mapString; @@ -383,7 +385,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getLocalHostId() { log(" getLocalHostId()"); - return c.getStringValue("/storage_service/hostid/local"); + return client.getStringValue("/storage_service/hostid/local"); } public String getLocalBroadCastingAddress() { @@ -393,17 +395,18 @@ public class StorageService extends NotificationBroadcasterSupport // we will use the getHostIdToAddressMap with the hostid return getHostIdToAddressMap().get(getLocalHostId()); } + /** Retrieve the mapping of endpoint to host ID */ @Override public Map getHostIdMap() { log(" getHostIdMap()"); - return c.getMapStrValue("/storage_service/host_id"); + return client.getMapStrValue("/storage_service/host_id"); } /** Retrieve the mapping of endpoint to host ID */ public Map getHostIdToAddressMap() { log(" getHostIdToAddressMap()"); - return c.getReverseMapStrValue("/storage_service/host_id"); + return client.getReverseMapStrValue("/storage_service/host_id"); } /** @@ -414,7 +417,7 @@ public class StorageService extends NotificationBroadcasterSupport @Deprecated public double getLoad() { log(" getLoad()"); - return c.getDoubleValue("/storage_service/load"); + return client.getDoubleValue("/storage_service/load"); } /** Human-readable load value */ @@ -430,8 +433,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" getLoadMap()"); Map load = getLoadMapAsDouble(); Map map = new HashMap<>(); - for (Map.Entry entry : load.entrySet()) - { + for (Map.Entry entry : load.entrySet()) { map.put(entry.getKey(), FileUtils.stringifyFileSize(entry.getValue())); } return map; @@ -439,7 +441,7 @@ public class StorageService extends NotificationBroadcasterSupport public Map getLoadMapAsDouble() { log(" getLoadMapAsDouble()"); - return c.getMapStringDouble("/storage_service/load_map"); + return client.getMapStringDouble("/storage_service/load_map"); } /** @@ -450,7 +452,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public int getCurrentGenerationNumber() { log(" getCurrentGenerationNumber()"); - return c.getIntValue("/storage_service/generation_number"); + return client.getIntValue("/storage_service/generation_number"); } /** @@ -466,22 +468,18 @@ public class StorageService extends NotificationBroadcasterSupport * the endpoint responsible for this key */ @Override - public List getNaturalEndpoints(String keyspaceName, String cf, - String key) { + public List getNaturalEndpoints(String keyspaceName, String cf, String key) { log(" getNaturalEndpoints(String keyspaceName, String cf, String key)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("cf", cf); queryParams.add("key", key); - return c.getListInetAddressValue( - "/storage_service/natural_endpoints/" + keyspaceName, - queryParams); + return client.getListInetAddressValue("/storage_service/natural_endpoints/" + keyspaceName, queryParams); } @Override - public List getNaturalEndpoints(String keyspaceName, - ByteBuffer key) { + public List getNaturalEndpoints(String keyspaceName, ByteBuffer key) { log(" getNaturalEndpoints(String keyspaceName, ByteBuffer key)"); - return c.getListInetAddressValue(""); + return client.getListInetAddressValue(""); } /** @@ -494,14 +492,12 @@ public class StorageService extends NotificationBroadcasterSupport * the name of the keyspaces to snapshot; empty means "all." */ @Override - public void takeSnapshot(String tag, String... keyspaceNames) - throws IOException { + public void takeSnapshot(String tag, String... keyspaceNames) throws IOException { log(" takeSnapshot(String tag, String... keyspaceNames) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "tag", tag); - APIClient.set_query_param(queryParams, "kn", - APIClient.join(keyspaceNames)); - c.post("/storage_service/snapshots", queryParams); + APIClient.set_query_param(queryParams, "kn", APIClient.join(keyspaceNames)); + client.post("/storage_service/snapshots", queryParams); } /** @@ -516,20 +512,22 @@ public class StorageService extends NotificationBroadcasterSupport * the tag given to the snapshot; may not be null or empty */ @Override - public void takeColumnFamilySnapshot(String keyspaceName, - String columnFamilyName, String tag) throws IOException { + public void takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException { log(" takeColumnFamilySnapshot(String keyspaceName, String columnFamilyName, String tag) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - if (keyspaceName == null) + if (keyspaceName == null) { throw new IOException("You must supply a keyspace name"); - if (columnFamilyName == null) + } + if (columnFamilyName == null) { throw new IOException("You must supply a table name"); - if (tag == null || tag.equals("")) + } + if (tag == null || tag.equals("")) { throw new IOException("You must supply a snapshot name."); + } queryParams.add("tag", tag); queryParams.add("kn", keyspaceName); queryParams.add("cf", columnFamilyName); - c.post("/storage_service/snapshots", queryParams); + client.post("/storage_service/snapshots", queryParams); } /** @@ -537,14 +535,12 @@ public class StorageService extends NotificationBroadcasterSupport * tag is specified we will remove all snapshots. */ @Override - public void clearSnapshot(String tag, String... keyspaceNames) - throws IOException { + public void clearSnapshot(String tag, String... keyspaceNames) throws IOException { log(" clearSnapshot(String tag, String... keyspaceNames) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "tag", tag); - APIClient.set_query_param(queryParams, "kn", - APIClient.join(keyspaceNames)); - c.delete("/storage_service/snapshots", queryParams); + APIClient.set_query_param(queryParams, "kn", APIClient.join(keyspaceNames)); + client.delete("/storage_service/snapshots", queryParams); } /** @@ -555,12 +551,11 @@ public class StorageService extends NotificationBroadcasterSupport @Override public Map getSnapshotDetails() { log(" getSnapshotDetails()"); - return c.getMapStringSnapshotTabularDataValue( - "/storage_service/snapshots", null); + return client.getMapStringSnapshotTabularDataValue("/storage_service/snapshots", null); } public Map>> getSnapshotKeyspaceColumnFamily() { - JsonArray arr = c.getJsonArray("/storage_service/snapshots"); + JsonArray arr = client.getJsonArray("/storage_service/snapshots"); Map>> res = new HashMap>>(); for (int i = 0; i < arr.size(); i++) { JsonObject obj = arr.getJsonObject(i); @@ -588,36 +583,30 @@ public class StorageService extends NotificationBroadcasterSupport @Override public long trueSnapshotsSize() { log(" trueSnapshotsSize()"); - return c.getLongValue("/storage_service/snapshots/size/true"); + return client.getLongValue("/storage_service/snapshots/size/true"); } /** * Forces major compaction of a single keyspace */ - public void forceKeyspaceCompaction(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException { + public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - c.post("/storage_service/keyspace_compaction/" + keyspaceName, - queryParams); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + client.post("/storage_service/keyspace_compaction/" + keyspaceName, queryParams); } /** * Trigger a cleanup of keys on a single keyspace */ @Override - public int forceKeyspaceCleanup(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException { + public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - return c.postInt("/storage_service/keyspace_cleanup/" + keyspaceName, - queryParams); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + return client.postInt("/storage_service/keyspace_cleanup/" + keyspaceName, queryParams); } /** @@ -628,27 +617,21 @@ public class StorageService extends NotificationBroadcasterSupport * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ @Override - public int scrub(boolean disableSnapshot, boolean skipCorrupted, - String keyspaceName, String... columnFamilies) throws IOException, - ExecutionException, InterruptedException { + public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException { log(" scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies); } @Override - public int scrub(boolean disableSnapshot, boolean skipCorrupted, - boolean checkData, String keyspaceName, String... columnFamilies) - throws IOException, ExecutionException, - InterruptedException { + public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String keyspaceName, + String... columnFamilies) throws IOException, ExecutionException, InterruptedException { log(" scrub(boolean disableSnapshot, boolean skipCorrupted, bool checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_bool_query_param(queryParams, "disable_snapshot", - disableSnapshot); - APIClient.set_bool_query_param(queryParams, "skip_corrupted", - skipCorrupted); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - return c.getIntValue("/storage_service/keyspace_scrub/" + keyspaceName); + APIClient.set_bool_query_param(queryParams, "disable_snapshot", disableSnapshot); + APIClient.set_bool_query_param(queryParams, "skip_corrupted", skipCorrupted); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + return client.getIntValue("/storage_service/keyspace_scrub/" + keyspaceName); } /** @@ -656,18 +639,13 @@ public class StorageService extends NotificationBroadcasterSupport * bad rows and do not snapshot sstables first. */ @Override - public int upgradeSSTables(String keyspaceName, - boolean excludeCurrentVersion, String... columnFamilies) - throws IOException, ExecutionException, - InterruptedException { + public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException { log(" upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_bool_query_param(queryParams, "exclude_current_version", - excludeCurrentVersion); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - return c.getIntValue( - "/storage_service/keyspace_upgrade_sstables/" + keyspaceName); + APIClient.set_bool_query_param(queryParams, "exclude_current_version", excludeCurrentVersion); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + return client.getIntValue("/storage_service/keyspace_upgrade_sstables/" + keyspaceName); } /** @@ -679,23 +657,22 @@ public class StorageService extends NotificationBroadcasterSupport * @throws IOException */ @Override - public void forceKeyspaceFlush(String keyspaceName, - String... columnFamilies) throws IOException, ExecutionException, - InterruptedException { + public void forceKeyspaceFlush(String keyspaceName, String... columnFamilies) + throws IOException, ExecutionException, InterruptedException { log(" forceKeyspaceFlush(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - c.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + client.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams); } - class CheckRepair extends TimerTask { - private APIClient c = new APIClient(); - int id; - String keyspace; - String message; - MultivaluedMap queryParams = new MultivaluedHashMap(); - int cmd; + private class CheckRepair extends TimerTask { + @SuppressWarnings("unused") + private int id; + private String keyspace; + private String message; + private MultivaluedMap queryParams = new MultivaluedHashMap(); + private int cmd; + public CheckRepair(int id, String keyspace) { this.id = id; this.keyspace = keyspace; @@ -704,15 +681,17 @@ public class StorageService extends NotificationBroadcasterSupport // The returned id is the command number this.cmd = id; } + @Override public void run() { - String status = c.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); + String status = client.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); if (!status.equals("RUNNING")) { cancel(); if (!status.equals("SUCCESSFUL")) { - sendNotification("repair", message + "failed", new int[]{cmd, RepairStatus.SESSION_FAILED.ordinal()}); + sendNotification("repair", message + "failed", + new int[] { cmd, RepairStatus.SESSION_FAILED.ordinal() }); } - sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.FINISHED.ordinal()}); + sendNotification("repair", message + "finished", new int[] { cmd, RepairStatus.FINISHED.ordinal() }); } } @@ -721,24 +700,25 @@ public class StorageService extends NotificationBroadcasterSupport /** * Sends JMX notification to subscribers. * - * @param type Message type - * @param message Message itself - * @param userObject Arbitrary object to attach to notification + * @param type + * Message type + * @param message + * Message itself + * @param userObject + * Arbitrary object to attach to notification */ - public void sendNotification(String type, String message, Object userObject) - { - Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message); + public void sendNotification(String type, String message, Object userObject) { + Notification jmxNotification = new Notification(type, getBoundName(), + notificationSerialNumber.incrementAndGet(), message); jmxNotification.setUserData(userObject); sendNotification(jmxNotification); } - public String getRepairMessage(final int cmd, - final String keyspace, - final int ranges_size, - final RepairParallelism parallelismDegree, - final boolean fullRepair) { - return String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", - cmd, ranges_size, keyspace, parallelismDegree, fullRepair); + public String getRepairMessage(final int cmd, final String keyspace, final int ranges_size, + final RepairParallelism parallelismDegree, final boolean fullRepair) { + return String.format( + "Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", cmd, + ranges_size, keyspace, parallelismDegree, fullRepair); } /** @@ -747,7 +727,7 @@ public class StorageService extends NotificationBroadcasterSupport */ public int waitAndNotifyRepair(int cmd, String keyspace, String message) { logger.finest(message); - sendNotification("repair", message, new int[]{cmd, RepairStatus.STARTED.ordinal()}); + sendNotification("repair", message, new int[] { cmd, RepairStatus.STARTED.ordinal() }); TimerTask taskToExecute = new CheckRepair(cmd, keyspace); timer.schedule(taskToExecute, 100, 1000); return cmd; @@ -773,16 +753,15 @@ public class StorageService extends NotificationBroadcasterSupport APIClient.set_query_param(queryParams, op, options.get(op)); } - int cmd = c.postInt("/storage_service/repair_async/" + keyspace, queryParams); + int cmd = client.postInt("/storage_service/repair_async/" + keyspace, queryParams); waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true)); return cmd; } @Override - public int forceRepairAsync(String keyspace, boolean isSequential, - Collection dataCenters, Collection hosts, - boolean primaryRange, boolean repairedAt, String... columnFamilies) - throws IOException { + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, + Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) + throws IOException { log(" forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException"); Map options = new HashMap(); return repairAsync(keyspace, options); @@ -794,18 +773,16 @@ public class StorageService extends NotificationBroadcasterSupport } @Override - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, boolean isSequential, - Collection dataCenters, Collection hosts, - boolean repairedAt, String... columnFamilies) throws IOException { + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, + Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) + throws IOException { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException"); - return c.getIntValue(""); + return client.getIntValue(""); } @Override - public int forceRepairAsync(String keyspace, boolean isSequential, - boolean isLocal, boolean primaryRange, boolean fullRepair, - String... columnFamilies) { + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, + boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); Map options = new HashMap(); return repairAsync(keyspace, options); @@ -813,17 +790,16 @@ public class StorageService extends NotificationBroadcasterSupport @Override @Deprecated - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, boolean isSequential, boolean isLocal, - boolean repairedAt, String... columnFamilies) { + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, + boolean isLocal, boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies)"); - return c.getIntValue(""); + return client.getIntValue(""); } @Override public void forceTerminateAllRepairSessions() { log(" forceTerminateAllRepairSessions()"); - c.post("/storage_service/force_terminate"); + client.post("/storage_service/force_terminate"); } /** @@ -832,7 +808,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public void decommission() throws InterruptedException { log(" decommission() throws InterruptedException"); - c.post("/storage_service/decommission"); + client.post("/storage_service/decommission"); } /** @@ -845,20 +821,22 @@ public class StorageService extends NotificationBroadcasterSupport log(" move(String newToken) throws IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "new_token", newToken); - c.post("/storage_service/move", queryParams); + client.post("/storage_service/move", queryParams); } /** * removeToken removes token (and all data associated with enpoint that had * it) from the ring - * @param hostIdString the host id to remove + * + * @param hostIdString + * the host id to remove */ @Override public void removeNode(String hostIdString) { log(" removeNode(String token)"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "host_id", hostIdString); - c.post("/storage_service/remove_node", queryParams); + client.post("/storage_service/remove_node", queryParams); } /** @@ -867,7 +845,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getRemovalStatus() { log(" getRemovalStatus()"); - return c.getStringValue("/storage_service/removal_status"); + return client.getStringValue("/storage_service/removal_status"); } /** @@ -876,7 +854,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public void forceRemoveCompletion() { log(" forceRemoveCompletion()"); - c.post("/storage_service/force_remove_completion"); + client.post("/storage_service/force_remove_completion"); } /** @@ -899,19 +877,18 @@ public class StorageService extends NotificationBroadcasterSupport * @see ch.qos.logback.classic.Level#toLevel(String) */ @Override - public void setLoggingLevel(String classQualifier, String level) - throws Exception { + public void setLoggingLevel(String classQualifier, String level) throws Exception { log(" setLoggingLevel(String classQualifier, String level) throws Exception"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "level", level); - c.post("/system/logger/" + classQualifier, queryParams); + client.post("/system/logger/" + classQualifier, queryParams); } /** get the runtime logging levels */ @Override public Map getLoggingLevels() { log(" getLoggingLevels()"); - return c.getMapStrValue("/storage_service/logging_level"); + return client.getMapStrValue("/storage_service/logging_level"); } /** @@ -921,14 +898,14 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String getOperationMode() { log(" getOperationMode()"); - return c.getStringValue("/storage_service/operation_mode"); + return client.getStringValue("/storage_service/operation_mode"); } /** Returns whether the storage service is starting or not */ @Override public boolean isStarting() { log(" isStarting()"); - return c.getBooleanValue("/storage_service/is_starting"); + return client.getBooleanValue("/storage_service/is_starting"); } /** get the progress of a drain operation */ @@ -938,7 +915,7 @@ public class StorageService extends NotificationBroadcasterSupport // FIXME // This is a workaround so the nodetool would work // it should be revert when the drain progress will be implemented - //return c.getStringValue("/storage_service/drain"); + // return c.getStringValue("/storage_service/drain"); return String.format("Drained %s/%s ColumnFamilies", 0, 0); } @@ -947,10 +924,9 @@ public class StorageService extends NotificationBroadcasterSupport * commitlog. */ @Override - public void drain() - throws IOException, InterruptedException, ExecutionException { + public void drain() throws IOException, InterruptedException, ExecutionException { log(" drain() throws IOException, InterruptedException, ExecutionException"); - c.post("/storage_service/drain"); + client.post("/storage_service/drain"); } /** @@ -966,12 +942,11 @@ public class StorageService extends NotificationBroadcasterSupport * The column family to delete data from. */ @Override - public void truncate(String keyspace, String columnFamily) - throws TimeoutException, IOException { + public void truncate(String keyspace, String columnFamily) throws TimeoutException, IOException { log(" truncate(String keyspace, String columnFamily)throws TimeoutException, IOException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "cf", columnFamily); - c.post("/storage_service/truncate/" + keyspace, queryParams); + client.post("/storage_service/truncate/" + keyspace, queryParams); } /** @@ -981,7 +956,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public Map getOwnership() { log(" getOwnership()"); - return c.getMapInetAddressFloatValue("/storage_service/ownership/"); + return client.getMapInetAddressFloatValue("/storage_service/ownership/"); } /** @@ -992,26 +967,26 @@ public class StorageService extends NotificationBroadcasterSupport * else a empty Map is returned. */ @Override - public Map effectiveOwnership(String keyspace) - throws IllegalStateException { + public Map effectiveOwnership(String keyspace) throws IllegalStateException { log(" effectiveOwnership(String keyspace) throws IllegalStateException"); try { - return c.getMapInetAddressFloatValue("/storage_service/ownership/" + keyspace); + return client.getMapInetAddressFloatValue("/storage_service/ownership/" + keyspace); } catch (Exception e) { - throw new IllegalStateException("Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless"); + throw new IllegalStateException( + "Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless"); } } @Override public List getKeyspaces() { log(" getKeyspaces()"); - return c.getListStrValue("/storage_service/keyspaces"); + return client.getListStrValue("/storage_service/keyspaces"); } public Map> getColumnFamilyPerKeyspace() { Map> res = new HashMap>(); - JsonArray mbeans = c.getJsonArray("/column_family/"); + JsonArray mbeans = client.getJsonArray("/column_family/"); for (int i = 0; i < mbeans.size(); i++) { JsonObject mbean = mbeans.getJsonObject(i); @@ -1030,7 +1005,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" getNonSystemKeyspaces()"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("type", "user"); - return c.getListStrValue("/storage_service/keyspaces", queryParams); + return client.getListStrValue("/storage_service/keyspaces", queryParams); } /** @@ -1050,114 +1025,109 @@ public class StorageService extends NotificationBroadcasterSupport * double, (default 0.0) */ @Override - public void updateSnitch(String epSnitchClassName, Boolean dynamic, - Integer dynamicUpdateInterval, Integer dynamicResetInterval, - Double dynamicBadnessThreshold) throws ClassNotFoundException { + public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, + Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { log(" updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException"); MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_bool_query_param(queryParams, "dynamic", dynamic); - APIClient.set_query_param(queryParams, "epSnitchClassName", - epSnitchClassName); + APIClient.set_query_param(queryParams, "epSnitchClassName", epSnitchClassName); if (dynamicUpdateInterval != null) { - queryParams.add("dynamic_update_interval", - dynamicUpdateInterval.toString()); + queryParams.add("dynamic_update_interval", dynamicUpdateInterval.toString()); } if (dynamicResetInterval != null) { - queryParams.add("dynamic_reset_interval", - dynamicResetInterval.toString()); + queryParams.add("dynamic_reset_interval", dynamicResetInterval.toString()); } if (dynamicBadnessThreshold != null) { - queryParams.add("dynamic_badness_threshold", - dynamicBadnessThreshold.toString()); + queryParams.add("dynamic_badness_threshold", dynamicBadnessThreshold.toString()); } - c.post("/storage_service/update_snitch", queryParams); + client.post("/storage_service/update_snitch", queryParams); } // allows a user to forcibly 'kill' a sick node @Override public void stopGossiping() { log(" stopGossiping()"); - c.delete("/storage_service/gossiping"); + client.delete("/storage_service/gossiping"); } // allows a user to recover a forcibly 'killed' node @Override public void startGossiping() { log(" startGossiping()"); - c.post("/storage_service/gossiping"); + client.post("/storage_service/gossiping"); } // allows a user to see whether gossip is running or not @Override public boolean isGossipRunning() { log(" isGossipRunning()"); - return c.getBooleanValue("/storage_service/gossiping"); + return client.getBooleanValue("/storage_service/gossiping"); } // allows a user to forcibly completely stop cassandra @Override public void stopDaemon() { log(" stopDaemon()"); - c.post("/storage_service/stop_daemon"); + client.post("/storage_service/stop_daemon"); } // to determine if gossip is disabled @Override public boolean isInitialized() { log(" isInitialized()"); - return c.getBooleanValue("/storage_service/is_initialized"); + return client.getBooleanValue("/storage_service/is_initialized"); } // allows a user to disable thrift @Override public void stopRPCServer() { log(" stopRPCServer()"); - c.delete("/storage_service/rpc_server"); + client.delete("/storage_service/rpc_server"); } // allows a user to reenable thrift @Override public void startRPCServer() { log(" startRPCServer()"); - c.post("/storage_service/rpc_server"); + client.post("/storage_service/rpc_server"); } // to determine if thrift is running @Override public boolean isRPCServerRunning() { log(" isRPCServerRunning()"); - return c.getBooleanValue("/storage_service/rpc_server"); + return client.getBooleanValue("/storage_service/rpc_server"); } @Override public void stopNativeTransport() { log(" stopNativeTransport()"); - c.delete("/storage_service/native_transport"); + client.delete("/storage_service/native_transport"); } @Override public void startNativeTransport() { log(" startNativeTransport()"); - c.post("/storage_service/native_transport"); + client.post("/storage_service/native_transport"); } @Override public boolean isNativeTransportRunning() { log(" isNativeTransportRunning()"); - return c.getBooleanValue("/storage_service/native_transport"); + return client.getBooleanValue("/storage_service/native_transport"); } // allows a node that have been started without joining the ring to join it @Override public void joinRing() throws IOException { log(" joinRing() throws IOException"); - c.post("/storage_service/join_ring"); + client.post("/storage_service/join_ring"); } @Override public boolean isJoined() { log(" isJoined()"); - return c.getBooleanValue("/storage_service/join_ring"); + return client.getBooleanValue("/storage_service/join_ring"); } @Override @@ -1165,18 +1135,18 @@ public class StorageService extends NotificationBroadcasterSupport log(" setStreamThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Integer.toString(value)); - c.post("/storage_service/stream_throughput", queryParams); + client.post("/storage_service/stream_throughput", queryParams); } @Override public int getStreamThroughputMbPerSec() { log(" getStreamThroughputMbPerSec()"); - return c.getIntValue("/storage_service/stream_throughput"); + return client.getIntValue("/storage_service/stream_throughput"); } public int getCompactionThroughputMbPerSec() { log(" getCompactionThroughputMbPerSec()"); - return c.getIntValue("/storage_service/compaction_throughput"); + return client.getIntValue("/storage_service/compaction_throughput"); } @Override @@ -1184,13 +1154,13 @@ public class StorageService extends NotificationBroadcasterSupport log(" setCompactionThroughputMbPerSec(int value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Integer.toString(value)); - c.post("/storage_service/compaction_throughput", queryParams); + client.post("/storage_service/compaction_throughput", queryParams); } @Override public boolean isIncrementalBackupsEnabled() { log(" isIncrementalBackupsEnabled()"); - return c.getBooleanValue("/storage_service/incremental_backups"); + return client.getBooleanValue("/storage_service/incremental_backups"); } @Override @@ -1198,7 +1168,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" setIncrementalBackupsEnabled(boolean value)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("value", Boolean.toString(value)); - c.post("/storage_service/incremental_backups", queryParams); + client.post("/storage_service/incremental_backups", queryParams); } /** @@ -1217,9 +1187,9 @@ public class StorageService extends NotificationBroadcasterSupport if (sourceDc != null) { MultivaluedMap queryParams = new MultivaluedHashMap(); APIClient.set_query_param(queryParams, "source_dc", sourceDc); - c.post("/storage_service/rebuild", queryParams); + client.post("/storage_service/rebuild", queryParams); } else { - c.post("/storage_service/rebuild"); + client.post("/storage_service/rebuild"); } } @@ -1227,7 +1197,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public void bulkLoad(String directory) { log(" bulkLoad(String directory)"); - c.post("/storage_service/bulk_load/" + directory); + client.post("/storage_service/bulk_load/" + directory); } /** @@ -1237,14 +1207,13 @@ public class StorageService extends NotificationBroadcasterSupport @Override public String bulkLoadAsync(String directory) { log(" bulkLoadAsync(String directory)"); - return c.getStringValue( - "/storage_service/bulk_load_async/" + directory); + return client.getStringValue("/storage_service/bulk_load_async/" + directory); } @Override public void rescheduleFailedDeletions() { log(" rescheduleFailedDeletions()"); - c.post("/storage_service/reschedule_failed_deletions"); + client.post("/storage_service/reschedule_failed_deletions"); } /** @@ -1260,7 +1229,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" loadNewSSTables(String ksName, String cfName)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("cf", cfName); - c.post("/storage_service/sstables/" + ksName, queryParams); + client.post("/storage_service/sstables/" + ksName, queryParams); } /** @@ -1276,22 +1245,21 @@ public class StorageService extends NotificationBroadcasterSupport @Override public List sampleKeyRange() { log(" sampleKeyRange()"); - return c.getListStrValue("/storage_service/sample_key_range"); + return client.getListStrValue("/storage_service/sample_key_range"); } /** * rebuild the specified indexes */ @Override - public void rebuildSecondaryIndex(String ksName, String cfName, - String... idxNames) { + public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { log(" rebuildSecondaryIndex(String ksName, String cfName, String... idxNames)"); } @Override public void resetLocalSchema() throws IOException { log(" resetLocalSchema() throws IOException"); - c.post("/storage_service/relocal_schema"); + client.post("/storage_service/relocal_schema"); } /** @@ -1309,7 +1277,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" setTraceProbability(double probability)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("probability", Double.toString(probability)); - c.post("/storage_service/trace_probability", queryParams); + client.post("/storage_service/trace_probability", queryParams); } /** @@ -1318,28 +1286,24 @@ public class StorageService extends NotificationBroadcasterSupport @Override public double getTraceProbability() { log(" getTraceProbability()"); - return c.getDoubleValue("/storage_service/trace_probability"); + return client.getDoubleValue("/storage_service/trace_probability"); } @Override - public void disableAutoCompaction(String ks, String... columnFamilies) - throws IOException { + public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("disableAutoCompaction(String ks, String... columnFamilies)"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); - c.delete("/storage_service/auto_compaction/", queryParams); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); + client.delete("/storage_service/auto_compaction/", queryParams); } @Override - public void enableAutoCompaction(String ks, String... columnFamilies) - throws IOException { + public void enableAutoCompaction(String ks, String... columnFamilies) throws IOException { log("enableAutoCompaction(String ks, String... columnFamilies)"); MultivaluedMap queryParams = new MultivaluedHashMap(); - APIClient.set_query_param(queryParams, "cf", - APIClient.join(columnFamilies)); + APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); try { - c.post("/storage_service/auto_compaction/", queryParams); + client.post("/storage_service/auto_compaction/", queryParams); } catch (RuntimeException e) { // FIXME should throw the right exception throw new IOException(e.getMessage()); @@ -1352,28 +1316,28 @@ public class StorageService extends NotificationBroadcasterSupport log(" deliverHints(String host) throws UnknownHostException"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("host", host); - c.post("/storage_service/deliver_hints", queryParams); + client.post("/storage_service/deliver_hints", queryParams); } /** Returns the name of the cluster */ @Override public String getClusterName() { log(" getClusterName()"); - return c.getStringValue("/storage_service/cluster_name"); + return client.getStringValue("/storage_service/cluster_name"); } /** Returns the cluster partitioner */ @Override public String getPartitionerName() { log(" getPartitionerName()"); - return c.getStringValue("/storage_service/partitioner_name"); + return client.getStringValue("/storage_service/partitioner_name"); } /** Returns the threshold for warning of queries with many tombstones */ @Override public int getTombstoneWarnThreshold() { log(" getTombstoneWarnThreshold()"); - return c.getIntValue("/storage_service/tombstone_warn_threshold"); + return client.getIntValue("/storage_service/tombstone_warn_threshold"); } /** Sets the threshold for warning queries with many tombstones */ @@ -1381,16 +1345,15 @@ public class StorageService extends NotificationBroadcasterSupport public void setTombstoneWarnThreshold(int tombstoneDebugThreshold) { log(" setTombstoneWarnThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); - queryParams.add("debug_threshold", - Integer.toString(tombstoneDebugThreshold)); - c.post("/storage_service/tombstone_warn_threshold", queryParams); + queryParams.add("debug_threshold", Integer.toString(tombstoneDebugThreshold)); + client.post("/storage_service/tombstone_warn_threshold", queryParams); } /** Returns the threshold for abandoning queries with many tombstones */ @Override public int getTombstoneFailureThreshold() { log(" getTombstoneFailureThreshold()"); - return c.getIntValue("/storage_service/tombstone_failure_threshold"); + return client.getIntValue("/storage_service/tombstone_failure_threshold"); } /** Sets the threshold for abandoning queries with many tombstones */ @@ -1398,16 +1361,15 @@ public class StorageService extends NotificationBroadcasterSupport public void setTombstoneFailureThreshold(int tombstoneDebugThreshold) { log(" setTombstoneFailureThreshold(int tombstoneDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); - queryParams.add("debug_threshold", - Integer.toString(tombstoneDebugThreshold)); - c.post("/storage_service/tombstone_failure_threshold", queryParams); + queryParams.add("debug_threshold", Integer.toString(tombstoneDebugThreshold)); + client.post("/storage_service/tombstone_failure_threshold", queryParams); } /** Returns the threshold for rejecting queries due to a large batch size */ @Override public int getBatchSizeFailureThreshold() { log(" getBatchSizeFailureThreshold()"); - return c.getIntValue("/storage_service/batch_size_failure_threshold"); + return client.getIntValue("/storage_service/batch_size_failure_threshold"); } /** Sets the threshold for rejecting queries due to a large batch size */ @@ -1416,7 +1378,7 @@ public class StorageService extends NotificationBroadcasterSupport log(" setBatchSizeFailureThreshold(int batchSizeDebugThreshold)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("threshold", Integer.toString(batchSizeDebugThreshold)); - c.post("/storage_service/batch_size_failure_threshold", queryParams); + client.post("/storage_service/batch_size_failure_threshold", queryParams); } /** @@ -1427,74 +1389,73 @@ public class StorageService extends NotificationBroadcasterSupport log(" setHintedHandoffThrottleInKB(int throttleInKB)"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("throttle", Integer.toString(throttleInKB)); - c.post("/storage_service/hinted_handoff", queryParams); + client.post("/storage_service/hinted_handoff", queryParams); } @Override - public void takeMultipleColumnFamilySnapshot(String tag, - String... columnFamilyList) throws IOException { + public void takeMultipleColumnFamilySnapshot(String tag, String... columnFamilyList) throws IOException { log(" takeMultipleColumnFamilySnapshot"); Map> keyspaceColumnfamily = new HashMap>(); Map> kss = getColumnFamilyPerKeyspace(); Map>> snapshots = getSnapshotKeyspaceColumnFamily(); - for (String columnFamily : columnFamilyList) - { + for (String columnFamily : columnFamilyList) { String splittedString[] = columnFamily.split("\\."); - if (splittedString.length == 2) - { + if (splittedString.length == 2) { String keyspaceName = splittedString[0]; String columnFamilyName = splittedString[1]; - if (keyspaceName == null) + if (keyspaceName == null) { throw new IOException("You must supply a keyspace name"); - if (columnFamilyName == null) + } + if (columnFamilyName == null) { throw new IOException("You must supply a column family name"); - if (tag == null || tag.equals("")) + } + if (tag == null || tag.equals("")) { throw new IOException("You must supply a snapshot name."); - if (!kss.containsKey(keyspaceName)) - { + } + if (!kss.containsKey(keyspaceName)) { throw new IOException("Keyspace " + keyspaceName + " does not exist"); } if (!kss.get(keyspaceName).contains(columnFamilyName)) { - throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", keyspaceName, columnFamilyName)); + throw new IllegalArgumentException( + String.format("Unknown keyspace/cf pair (%s.%s)", keyspaceName, columnFamilyName)); } - // As there can be multiple column family from same keyspace check if snapshot exist for that specific + // As there can be multiple column family from same keyspace + // check if snapshot exist for that specific // columnfamily and not for whole keyspace - if (snapshots.containsKey(tag) && snapshots.get(tag).containsKey(keyspaceName) && snapshots.get(tag).get(keyspaceName).contains(columnFamilyName)) { + if (snapshots.containsKey(tag) && snapshots.get(tag).containsKey(keyspaceName) + && snapshots.get(tag).get(keyspaceName).contains(columnFamilyName)) { throw new IOException("Snapshot " + tag + " already exists."); } - if (!keyspaceColumnfamily.containsKey(keyspaceName)) - { + if (!keyspaceColumnfamily.containsKey(keyspaceName)) { keyspaceColumnfamily.put(keyspaceName, new ArrayList()); } - // Add Keyspace columnfamily to map in order to support atomicity for snapshot process. - // So no snapshot should happen if any one of the above conditions fail for any keyspace or columnfamily + // Add Keyspace columnfamily to map in order to support + // atomicity for snapshot process. + // So no snapshot should happen if any one of the above + // conditions fail for any keyspace or columnfamily keyspaceColumnfamily.get(keyspaceName).add(columnFamilyName); - } - else - { + } else { throw new IllegalArgumentException( "Cannot take a snapshot on secondary index or invalid column family name. You must supply a column family name in the form of keyspace.columnfamily"); } } - for (Entry> entry : keyspaceColumnfamily.entrySet()) - { - for (String columnFamily : entry.getValue()) + for (Entry> entry : keyspaceColumnfamily.entrySet()) { + for (String columnFamily : entry.getValue()) { takeColumnFamilySnapshot(entry.getKey(), columnFamily, tag); + } } } @Override - public int forceRepairAsync(String keyspace, int parallelismDegree, - Collection dataCenters, Collection hosts, - boolean primaryRange, boolean fullRepair, - String... columnFamilies) { + public int forceRepairAsync(String keyspace, int parallelismDegree, Collection dataCenters, + Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRange, fullRepair, columnFamilies)"); Map options = new HashMap(); Joiner commas = Joiner.on(","); @@ -1506,7 +1467,7 @@ public class StorageService extends NotificationBroadcasterSupport options.put("hosts", commas.join(hosts)); } options.put("primaryRange", Boolean.toString(primaryRange)); - options.put("incremental", Boolean.toString(!fullRepair)); + options.put("incremental", Boolean.toString(!fullRepair)); if (columnFamilies != null && columnFamilies.length > 0) { options.put("columnFamilies", commas.join(columnFamilies)); } @@ -1514,10 +1475,8 @@ public class StorageService extends NotificationBroadcasterSupport } @Override - public int forceRepairRangeAsync(String beginToken, String endToken, - String keyspaceName, int parallelismDegree, - Collection dataCenters, Collection hosts, - boolean fullRepair, String... columnFamilies) { + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, + Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies) { log(" forceRepairRangeAsync(beginToken, endToken, keyspaceName, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)"); Map options = new HashMap(); Joiner commas = Joiner.on(","); @@ -1528,9 +1487,9 @@ public class StorageService extends NotificationBroadcasterSupport if (hosts != null) { options.put("hosts", commas.join(hosts)); } - options.put("incremental", Boolean.toString(!fullRepair)); + options.put("incremental", Boolean.toString(!fullRepair)); options.put("startToken", beginToken); - options.put("endToken", endToken); + options.put("endToken", endToken); return repairAsync(keyspaceName, options); } @@ -1539,14 +1498,14 @@ public class StorageService extends NotificationBroadcasterSupport return getHostIdMap(); } - @Override + @Override public Map getHostIdToEndpoint() { return getHostIdToAddressMap(); } @Override public void refreshSizeEstimates() throws ExecutionException { - // TODO Auto-generated method stub + // TODO Auto-generated method stub log(" refreshSizeEstimates"); } @@ -1560,14 +1519,14 @@ public class StorageService extends NotificationBroadcasterSupport @Override public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables) throws IOException, ExecutionException, InterruptedException { - // "jobs" not (yet) relevant for scylla. (though possibly useful...) + // "jobs" not (yet) relevant for scylla. (though possibly useful...) return forceKeyspaceCleanup(keyspaceName, tables); } @Override public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - // "jobs" not (yet) relevant for scylla. (though possibly useful...) + // "jobs" not (yet) relevant for scylla. (though possibly useful...) return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies); } @@ -1582,7 +1541,7 @@ public class StorageService extends NotificationBroadcasterSupport @Override public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs, String... tableNames) throws IOException, ExecutionException, InterruptedException { - // "jobs" not (yet) relevant for scylla. (though possibly useful...) + // "jobs" not (yet) relevant for scylla. (though possibly useful...) return upgradeSSTables(keyspaceName, excludeCurrentVersion, tableNames); } @@ -1591,12 +1550,12 @@ public class StorageService extends NotificationBroadcasterSupport log(" getNonLocalStrategyKeyspaces"); MultivaluedMap queryParams = new MultivaluedHashMap(); queryParams.add("type", "non_local_strategy"); - return c.getListStrValue("/storage_service/keyspaces", queryParams); + return client.getListStrValue("/storage_service/keyspaces", queryParams); } @Override public void setInterDCStreamThroughputMbPerSec(int value) { - // TODO Auto-generated method stub + // TODO Auto-generated method stub log(" setInterDCStreamThroughputMbPerSec"); }