diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8cc7bea..346f67b 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,11 +30,16 @@ import static javax.json.Json.createObjectBuilder; import java.io.StringReader; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import javax.json.Json; @@ -46,7 +51,13 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; @@ -55,6 +66,7 @@ import org.apache.cassandra.metrics.TableMetrics; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; import com.sun.jmx.mbeanserver.JmxMBeanServer; +import com.google.common.base.Throwables; public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStoreMBean { private static final Logger logger = Logger.getLogger(ColumnFamilyStore.class.getName()); @@ -62,6 +74,72 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore private final String type; private final String keyspace; private final String name; + private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; + private static final String[] COUNTER_DESCS = new String[] + { "partition key in raw hex bytes", // Table name and comments match Cassandra, we will use the partition key + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; + private static final CompositeType COUNTER_COMPOSITE_TYPE; + private static final TabularType COUNTER_TYPE; + + private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; + private static final String[] SAMPLER_DESCS = new String[] + { "cardinality of partitions", + "list of counter results" }; + + private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; + private static final CompositeType SAMPLING_RESULT; + + public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated"; + public static final String SNAPSHOT_DROP_PREFIX = "dropped"; + private JsonObject tableSamplerResult = null; + + private Future futureTableSamperResult = null; + private ExecutorService service = null; + + static + { + try + { + OpenType[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes); + COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES); + + OpenType[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE }; + SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes); + } catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + protected synchronized void startTableSampling(MultivaluedMap queryParams) { + if (futureTableSamperResult != null) { + return; + } + futureTableSamperResult = service.submit(() -> { + tableSamplerResult = client.getJsonObj("column_family/toppartitions/" + getCFName(), queryParams); + return null; + }); + } + + /* + * Wait until the action is completed + * It is safe to call this method multiple times + */ + public synchronized void waitUntilSamplingCompleted() { + try { + if (futureTableSamperResult != null) { + futureTableSamperResult.get(); + futureTableSamperResult = null; + } + } catch (InterruptedException | ExecutionException e) { + futureTableSamperResult = null; + throw new RuntimeException("Failed getting table statistics"); + } + } + public static final Set TYPE_NAMES = new HashSet<>(asList("ColumnFamilies", "IndexTables", "Tables")); @@ -75,6 +153,7 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore this.type = type; this.keyspace = keyspace; this.name = name; + service = Executors.newSingleThreadExecutor(); } public ColumnFamilyStore(APIClient client, ObjectName name) { @@ -416,16 +495,41 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore } @Override - public void beginLocalSampling(String sampler, int capacity) { - // TODO Auto-generated method stub + public void beginLocalSampling(String sampler_base, int capacity) { + MultivaluedMap queryParams = new MultivaluedHashMap(); + queryParams.add("capacity", Integer.toString(capacity)); + if (sampler_base.contains(":")) { + String[] parts = sampler_base.split(":"); + queryParams.add("duration", parts[1]); + } else { + queryParams.add("duration", "10000"); + } + startTableSampling(queryParams); log(" beginLocalSampling()"); - } @Override - public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException { - // TODO Auto-generated method stub + public CompositeData finishLocalSampling(String samplerType, int count) throws OpenDataException { log(" finishLocalSampling()"); - return null; + + waitUntilSamplingCompleted(); + + TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); + + JsonArray counters = tableSamplerResult.getJsonArray((samplerType.equalsIgnoreCase("reads")) ? "read" : "write"); + long size = 0; + if (counters != null) { + size = counters.size(); + for (int i = 0; i < counters.size(); i++) { + JsonObject counter = counters.getJsonObject(i); + result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, + new Object[] { counter.getString("partition"), // raw + counter.getJsonNumber("count").longValue(), // count + counter.getJsonNumber("error").longValue(), // error + counter.getString("partition") })); // string + } + } + //FIXME: size is not the cardinality, a true value needs to be propogated + return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[] { size, result }); } }