From 27313ee2c45bed8caf603537d809a7d7c3492084 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 28 Jan 2019 16:35:05 +0200 Subject: [PATCH] ColumnFamilyStore: Add an implementation for table sampling This patch adds the implementation for begin and finish local sampling of a column family. There is a difference in the implementation of Cassandra API and Scylla. In Cassandra and the JMX an external source start and stop the sampling. In Scylla, a single API call start the sampling and return with the result. In Scylla the API call always return sampling of the read and of the writes. To bridge the difference, the begin sampling command will use a Future when calling the API. The finish method will wait for the future to end. Because of the different implementation, it is possible that two consecutive calls will be made to start sampling one for the read and one for the write, similarly, two calls will be made to finish for read and write. The implementation would ignore the second call to start and will store the result, so the second call to finish will be served from the stored result. Note, that the use of future is only for safety, the way we expect it to work, the caller to the begin sampling will sleep anyhow while waiting for the result. To avoid breaking the MBean compatibility we piggyback the duration on top of the sampler string. If no duration is given, a default duration will be taken, this is also just as a precaution, we will modify the nodetool implementation to pass that information. There is a known issue with cardinality, that will need to be addressed. Also we return a value in the raw column to match what Cassandra JMX returns, but it's a duplication of the partition key. See scylladb/scylla#2811 Signed-off-by: Amnon Heiman Message-Id: <20190128143505.5241-1-amnon@scylladb.com> --- .../cassandra/db/ColumnFamilyStore.java | 116 +++++++++++++++++- 1 file changed, 110 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8cc7bea..346f67b 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,11 +30,16 @@ import static javax.json.Json.createObjectBuilder; import java.io.StringReader; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import javax.json.Json; @@ -46,7 +51,13 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; @@ -55,6 +66,7 @@ import org.apache.cassandra.metrics.TableMetrics; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; import com.sun.jmx.mbeanserver.JmxMBeanServer; +import com.google.common.base.Throwables; public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStoreMBean { private static final Logger logger = Logger.getLogger(ColumnFamilyStore.class.getName()); @@ -62,6 +74,72 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore private final String type; private final String keyspace; private final String name; + private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; + private static final String[] COUNTER_DESCS = new String[] + { "partition key in raw hex bytes", // Table name and comments match Cassandra, we will use the partition key + "value of this partition for given sampler", + "value is within the error bounds plus or minus of this", + "the partition key turned into a human readable format" }; + private static final CompositeType COUNTER_COMPOSITE_TYPE; + private static final TabularType COUNTER_TYPE; + + private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; + private static final String[] SAMPLER_DESCS = new String[] + { "cardinality of partitions", + "list of counter results" }; + + private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; + private static final CompositeType SAMPLING_RESULT; + + public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated"; + public static final String SNAPSHOT_DROP_PREFIX = "dropped"; + private JsonObject tableSamplerResult = null; + + private Future futureTableSamperResult = null; + private ExecutorService service = null; + + static + { + try + { + OpenType[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; + COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes); + COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES); + + OpenType[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE }; + SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes); + } catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + protected synchronized void startTableSampling(MultivaluedMap queryParams) { + if (futureTableSamperResult != null) { + return; + } + futureTableSamperResult = service.submit(() -> { + tableSamplerResult = client.getJsonObj("column_family/toppartitions/" + getCFName(), queryParams); + return null; + }); + } + + /* + * Wait until the action is completed + * It is safe to call this method multiple times + */ + public synchronized void waitUntilSamplingCompleted() { + try { + if (futureTableSamperResult != null) { + futureTableSamperResult.get(); + futureTableSamperResult = null; + } + } catch (InterruptedException | ExecutionException e) { + futureTableSamperResult = null; + throw new RuntimeException("Failed getting table statistics"); + } + } + public static final Set TYPE_NAMES = new HashSet<>(asList("ColumnFamilies", "IndexTables", "Tables")); @@ -75,6 +153,7 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore this.type = type; this.keyspace = keyspace; this.name = name; + service = Executors.newSingleThreadExecutor(); } public ColumnFamilyStore(APIClient client, ObjectName name) { @@ -416,16 +495,41 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore } @Override - public void beginLocalSampling(String sampler, int capacity) { - // TODO Auto-generated method stub + public void beginLocalSampling(String sampler_base, int capacity) { + MultivaluedMap queryParams = new MultivaluedHashMap(); + queryParams.add("capacity", Integer.toString(capacity)); + if (sampler_base.contains(":")) { + String[] parts = sampler_base.split(":"); + queryParams.add("duration", parts[1]); + } else { + queryParams.add("duration", "10000"); + } + startTableSampling(queryParams); log(" beginLocalSampling()"); - } @Override - public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException { - // TODO Auto-generated method stub + public CompositeData finishLocalSampling(String samplerType, int count) throws OpenDataException { log(" finishLocalSampling()"); - return null; + + waitUntilSamplingCompleted(); + + TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); + + JsonArray counters = tableSamplerResult.getJsonArray((samplerType.equalsIgnoreCase("reads")) ? "read" : "write"); + long size = 0; + if (counters != null) { + size = counters.size(); + for (int i = 0; i < counters.size(); i++) { + JsonObject counter = counters.getJsonObject(i); + result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, + new Object[] { counter.getString("partition"), // raw + counter.getJsonNumber("count").longValue(), // count + counter.getJsonNumber("error").longValue(), // error + counter.getString("partition") })); // string + } + } + //FIXME: size is not the cardinality, a true value needs to be propogated + return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[] { size, result }); } }