diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index c208756..6ddf665 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -54,7 +54,14 @@ import javax.management.NotificationBroadcasterSupport; import javax.management.NotificationFilter; import javax.management.NotificationListener; import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; @@ -66,6 +73,7 @@ import com.google.common.base.Joiner; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; import com.scylladb.jmx.utils.FileUtils; +import com.google.common.base.Throwables; /** * This abstraction contains the token/identifier of this node on the identifier @@ -76,6 +84,39 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, private static final Logger logger = Logger.getLogger(StorageService.class.getName()); private static final Timer timer = new Timer("Storage Service Repair", true); + private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; + private static final String[] COUNTER_DESCS = new String[] + { "partition key in raw hex bytes", + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; + private static final CompositeType COUNTER_COMPOSITE_TYPE; + private static final TabularType COUNTER_TYPE; + + private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; + private static final String[] SAMPLER_DESCS = new String[] + { "cardinality of partitions", + "list of counter results" }; + + private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; + private static final CompositeType SAMPLING_RESULT; + + static + { + try + { + OpenType[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes); + COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES); + + OpenType[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE }; + SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes); + } catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + private final NotificationBroadcasterSupport notificationBroadcasterSupport = new NotificationBroadcasterSupport(); @Override @@ -1749,9 +1790,39 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, APIClient.set_query_param(queryParams, "cf", APIClient.join(columnFamilies)); return client.getIntValue("/storage_service/keyspace_scrub/" + keyspaceName, queryParams); } + @Override public long getUptime() { log("getUptime()"); return client.getLongValue("/system/uptime_ms"); } + + @Override + public CompositeData getToppartitions(String sampler, List keyspaceFilters, List tableFilters, int duration, int capacity, int count) throws OpenDataException { + MultivaluedMap queryParams = new MultivaluedHashMap(); + APIClient.set_query_param(queryParams, "duration", Integer.toString(duration)); + APIClient.set_query_param(queryParams, "capacity", Integer.toString(capacity)); + APIClient.set_query_param(queryParams, "keyspace_filters", keyspaceFilters != null ? APIClient.join(keyspaceFilters.toArray(new String[0])) : null); + APIClient.set_query_param(queryParams, "table_filters", tableFilters != null ? APIClient.join(tableFilters.toArray(new String[0])) : null); + JsonObject result = client.getJsonObj("/storage_service/toppartitions", queryParams); + + JsonArray counters = result.getJsonArray((sampler.equalsIgnoreCase("reads")) ? "read" : "write"); + long cardinality = result.getJsonNumber((sampler.equalsIgnoreCase("reads")) ? "read_cardinality" : "write_cardinality").longValue(); + long size = 0; + TabularDataSupport tabularResult = new TabularDataSupport(COUNTER_TYPE); + + if (counters != null) { + size = (count > counters.size()) ? counters.size() : count; + for (int i = 0; i < size; i++) { + JsonObject counter = counters.getJsonObject(i); + tabularResult.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, + new Object[] { counter.getString("partition"), // raw + counter.getJsonNumber("count").longValue(), // count + counter.getJsonNumber("error").longValue(), // error + counter.getString("partition") })); // string + } + } + + return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[] { cardinality, tabularResult }); + } } diff --git a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java index 27ae3f9..1463b41 100644 --- a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -34,9 +34,11 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import javax.json.JsonObject; import javax.management.NotificationEmitter; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; +import javax.management.openmbean.OpenDataException; public interface StorageServiceMBean extends NotificationEmitter { /** @@ -883,6 +885,9 @@ public interface StorageServiceMBean extends NotificationEmitter { public List getSSTableInfo(String keyspace, String table); public List getSSTableInfo(); + /** retun the system uptime */ public long getUptime(); + + public CompositeData getToppartitions(String sampler, List keyspaceFilters, List tableFilters, int duration, int capacity, int count) throws OpenDataException; }