ColumnFamilyStore: Add an implementation for table sampling

This patch adds the implementation for begin and finish local sampling
of a column family.

There is a difference in the implementation of Cassandra API and Scylla.

In Cassandra and the JMX an external source start and stop the sampling.

In Scylla, a single API call start the sampling and return with the
result. In Scylla the API call always return sampling of the read and of
the writes.

To bridge the difference, the begin sampling command will use a Future
when calling the API. The finish method will wait for the future to end.

Because of the different implementation, it is possible that two
consecutive calls will be made to start sampling one for the read and
one for the write, similarly, two calls will be made to finish for read
and write.

The implementation would ignore the second call to start and will
store the result, so the second call to finish will be served from the
stored result.

Note, that the use of future is only for safety, the way we expect it to
work, the caller to the begin sampling will sleep anyhow while waiting
for the result.

To avoid breaking the MBean compatibility we piggyback the duration on
top of the sampler string.

If no duration is given, a default duration will be taken, this is also
just as a precaution, we will modify the nodetool implementation to
pass that information.

There is a known issue with cardinality, that will need to be addressed.
Also we return a value in the raw column to match what Cassandra JMX
returns, but it's a duplication of the partition key.

See scylladb/scylla#2811

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <20190128143505.5241-1-amnon@scylladb.com>
This commit is contained in:
Amnon Heiman 2019-01-28 16:35:05 +02:00 committed by Avi Kivity
parent d4493295ff
commit 27313ee2c4

View File

@ -30,11 +30,16 @@ import static javax.json.Json.createObjectBuilder;
import java.io.StringReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.json.Json;
@ -46,7 +51,13 @@ import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.MultivaluedMap;
@ -55,6 +66,7 @@ import org.apache.cassandra.metrics.TableMetrics;
import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.MetricsMBean;
import com.sun.jmx.mbeanserver.JmxMBeanServer;
import com.google.common.base.Throwables;
public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStoreMBean {
private static final Logger logger = Logger.getLogger(ColumnFamilyStore.class.getName());
@ -62,6 +74,72 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore
private final String type;
private final String keyspace;
private final String name;
private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"};
private static final String[] COUNTER_DESCS = new String[]
{ "partition key in raw hex bytes", // Table name and comments match Cassandra, we will use the partition key
"value of this partition for given sampler",
"value is within the error bounds plus or minus of this",
"the partition key turned into a human readable format" };
private static final CompositeType COUNTER_COMPOSITE_TYPE;
private static final TabularType COUNTER_TYPE;
private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"};
private static final String[] SAMPLER_DESCS = new String[]
{ "cardinality of partitions",
"list of counter results" };
private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
private static final CompositeType SAMPLING_RESULT;
public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated";
public static final String SNAPSHOT_DROP_PREFIX = "dropped";
private JsonObject tableSamplerResult = null;
private Future<JsonObject> futureTableSamperResult = null;
private ExecutorService service = null;
static
{
try
{
OpenType<?>[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING };
COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes);
COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES);
OpenType<?>[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE };
SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes);
} catch (OpenDataException e)
{
throw Throwables.propagate(e);
}
}
protected synchronized void startTableSampling(MultivaluedMap<String, String> queryParams) {
if (futureTableSamperResult != null) {
return;
}
futureTableSamperResult = service.submit(() -> {
tableSamplerResult = client.getJsonObj("column_family/toppartitions/" + getCFName(), queryParams);
return null;
});
}
/*
* Wait until the action is completed
* It is safe to call this method multiple times
*/
public synchronized void waitUntilSamplingCompleted() {
try {
if (futureTableSamperResult != null) {
futureTableSamperResult.get();
futureTableSamperResult = null;
}
} catch (InterruptedException | ExecutionException e) {
futureTableSamperResult = null;
throw new RuntimeException("Failed getting table statistics");
}
}
public static final Set<String> TYPE_NAMES = new HashSet<>(asList("ColumnFamilies", "IndexTables", "Tables"));
@ -75,6 +153,7 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore
this.type = type;
this.keyspace = keyspace;
this.name = name;
service = Executors.newSingleThreadExecutor();
}
public ColumnFamilyStore(APIClient client, ObjectName name) {
@ -416,16 +495,41 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore
}
@Override
public void beginLocalSampling(String sampler, int capacity) {
// TODO Auto-generated method stub
public void beginLocalSampling(String sampler_base, int capacity) {
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<String, String>();
queryParams.add("capacity", Integer.toString(capacity));
if (sampler_base.contains(":")) {
String[] parts = sampler_base.split(":");
queryParams.add("duration", parts[1]);
} else {
queryParams.add("duration", "10000");
}
startTableSampling(queryParams);
log(" beginLocalSampling()");
}
@Override
public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException {
// TODO Auto-generated method stub
public CompositeData finishLocalSampling(String samplerType, int count) throws OpenDataException {
log(" finishLocalSampling()");
return null;
waitUntilSamplingCompleted();
TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
JsonArray counters = tableSamplerResult.getJsonArray((samplerType.equalsIgnoreCase("reads")) ? "read" : "write");
long size = 0;
if (counters != null) {
size = counters.size();
for (int i = 0; i < counters.size(); i++) {
JsonObject counter = counters.getJsonObject(i);
result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES,
new Object[] { counter.getString("partition"), // raw
counter.getJsonNumber("count").longValue(), // count
counter.getJsonNumber("error").longValue(), // error
counter.getString("partition") })); // string
}
}
//FIXME: size is not the cardinality, a true value needs to be propogated
return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[] { size, result });
}
}