Merge "Removing counter pulling from the JMX" from Amnon

"This series uses the API that was added to scylla to remove the counter
 pulling and rely on the statistics collected by the API.

 The series extends the APIMeter, APIHistogram and APITimer to remove
 their pulling part and to fetch the information when needed from the
 API.

 For performence reason those objects will be cached, so that in the
 typical case of of multiple requests of different fields will cause a
 single API call."
This commit is contained in:
Pekka Enberg 2016-05-24 17:20:52 +03:00
commit 4eb02743cd
14 changed files with 317 additions and 144 deletions

View File

@ -64,6 +64,14 @@ public class APIClient {
return (value!= null && value.valid(duration))? value.stringValue() : null;
}
JsonObject getJsonObjectFromCache(String key, long duration) {
if (key == null) {
return null;
}
CacheEntry value = cache.get(key);
return (value!= null && value.valid(duration))? value.jsonObject() : null;
}
EstimatedHistogram getEstimatedHistogramFromCache(String key, long duration) {
if (key == null) {
return null;
@ -654,20 +662,30 @@ public class APIClient {
}
public JsonObject getJsonObj(String string,
MultivaluedMap<String, String> queryParams) {
MultivaluedMap<String, String> queryParams, long duration) {
if (string.equals("")) {
return null;
}
String key = getCacheKey(string, queryParams, duration);
JsonObject res = getJsonObjectFromCache(key, duration);
if (res != null) {
return res;
}
JsonReader reader = getReader(string, queryParams);
JsonObject res = reader.readObject();
res = reader.readObject();
reader.close();
if (duration > 0) {
cache.put(key, new CacheEntry(res));
}
return res;
}
public HistogramValues getHistogramValue(String url,
public JsonObject getJsonObj(String string,
MultivaluedMap<String, String> queryParams) {
return getJsonObj(string, queryParams, 0);
}
public static HistogramValues json2histogram(JsonObject obj) {
HistogramValues res = new HistogramValues();
JsonObject obj = getJsonObj(url, queryParams);
res.count = obj.getJsonNumber("count").longValue();
res.max = obj.getJsonNumber("max").longValue();
res.min = obj.getJsonNumber("min").longValue();
@ -684,6 +702,11 @@ public class APIClient {
return res;
}
public HistogramValues getHistogramValue(String url,
MultivaluedMap<String, String> queryParams) {
return json2histogram(getJsonObj(url, queryParams));
}
public HistogramValues getHistogramValue(String url) {
return getHistogramValue(url, null);
}

View File

@ -21,6 +21,8 @@
package com.scylladb.jmx.api;
import javax.json.JsonObject;
import com.scylladb.jmx.utils.EstimatedHistogram;
public class CacheEntry {
@ -43,4 +45,8 @@ public class CacheEntry {
public EstimatedHistogram getEstimatedHistogram() {
return (EstimatedHistogram)value;
}
public JsonObject jsonObject() {
return (JsonObject) value;
}
}

View File

@ -16,6 +16,7 @@ import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.reporting.JmxReporter;
import com.yammer.metrics.core.APIMeter;
public class APIMetrics {
private static final APIMetricsRegistry DEFAULT_REGISTRY = new APIMetricsRegistry();
@ -241,7 +242,7 @@ public class APIMetrics {
* the rate unit of the new meter
* @return a new {@link com.yammer.metrics.core.Meter}
*/
public static Meter newMeter(String url, Class<?> klass, String name,
public static APIMeter newMeter(String url, Class<?> klass, String name,
String eventType, TimeUnit unit) {
return DEFAULT_REGISTRY.newMeter(url, klass, name, eventType, unit);
}
@ -263,7 +264,7 @@ public class APIMetrics {
* the rate unit of the new meter
* @return a new {@link com.yammer.metrics.core.Meter}
*/
public static Meter newMeter(String url, Class<?> klass, String name,
public static APIMeter newMeter(String url, Class<?> klass, String name,
String scope, String eventType, TimeUnit unit) {
return DEFAULT_REGISTRY.newMeter(url, klass, name, scope, eventType,
unit);
@ -282,28 +283,11 @@ public class APIMetrics {
* the rate unit of the new meter
* @return a new {@link com.yammer.metrics.core.Meter}
*/
public static Meter newMeter(String url, MetricName metricName,
public static APIMeter newMeter(String url, MetricName metricName,
String eventType, TimeUnit unit) {
return DEFAULT_REGISTRY.newMeter(url, metricName, eventType, unit);
}
/**
* Creates a new {@link com.yammer.metrics.core.Meter} and registers it
* under the given metric name.
*
* @param metricName
* the name of the metric
* @param eventType
* the plural name of the type of events the meter is measuring
* (e.g., {@code "requests"})
* @param unit
* the rate unit of the new meter
* @return a new {@link com.yammer.metrics.core.Meter}
*/
public static Meter newSettableMeter(MetricName metricName,
String eventType, TimeUnit unit) {
return DEFAULT_REGISTRY.newSettableMeter(metricName, eventType, unit);
}
/**
* Creates a new {@link com.yammer.metrics.core.APITimer} and registers it
* under the given class and name.

View File

@ -10,6 +10,8 @@ import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.json.JsonObject;
import com.scylladb.jmx.api.APIClient;
import com.yammer.metrics.stats.Sample;
import com.yammer.metrics.stats.Snapshot;
@ -101,14 +103,7 @@ public class APIHistogram extends Histogram {
this(url, type, UPDATE_INTERVAL);
}
public void update() {
long now = System.currentTimeMillis();
if (now - last_update < UPDATE_INTERVAL) {
return;
}
last_update = now;
clear();
HistogramValues vals = c.getHistogramValue(url);
public void updateValue(HistogramValues vals) {
try {
if (vals.sample != null) {
for (long v : vals.sample) {
@ -128,6 +123,25 @@ public class APIHistogram extends Histogram {
}
}
public void update() {
if (url == null) {
return;
}
long now = System.currentTimeMillis();
if (now - last_update < UPDATE_INTERVAL) {
return;
}
last_update = now;
clear();
JsonObject obj = c.getJsonObj(url, null);
if (obj.containsKey("hist")) {
updateValue(APIClient.json2histogram(obj.getJsonObject("hist")));
} else {
updateValue(APIClient.json2histogram(obj));
}
}
/**
* Returns the number of values recorded.
*

View File

@ -1,34 +1,113 @@
package com.yammer.metrics.core;
/*
* Copyright (C) 2015 ScyllaDB
*/
import java.util.concurrent.ScheduledExecutorService;
/*
* Copyright 2015 Cloudius Systems
* This file is part of Scylla.
*
* Modified by Cloudius Systems
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* Modified by ScyllaDB
*/
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.json.JsonArray;
import javax.json.JsonObject;
import com.scylladb.jmx.api.APIClient;
public class APIMeter extends APISettableMeter {
String url;
private APIClient c = new APIClient();
public class APIMeter extends Meter {
public final static long CACHE_DURATION = 1000;
public APIMeter(String _url, ScheduledExecutorService tickThread,
String eventType, TimeUnit rateUnit, Clock clock) {
super(tickThread, eventType, rateUnit, clock);
// TODO Auto-generated constructor stub
url = _url;
String url;
String eventType;
TimeUnit rateUnit;
APIClient c = new APIClient();
long count;
double oneMinuteRate;
double fiveMinuteRate;
double fifteenMinuteRate;
double meanRate;
public APIMeter(String url, ScheduledExecutorService tickThread,
String eventType, TimeUnit rateUnit) {
super(tickThread, eventType, rateUnit, Clock.defaultClock());
super.stop();
this.url = url;
this.eventType = eventType;
this.rateUnit = rateUnit;
}
public long get_value() {
return c.getLongValue(url);
public void fromJson(JsonObject obj) {
JsonArray rates = obj.getJsonArray("rates");
int i = 0;
oneMinuteRate = rates.getJsonNumber(i++).doubleValue();
fiveMinuteRate = rates.getJsonNumber(i++).doubleValue();
fifteenMinuteRate = rates.getJsonNumber(i++).doubleValue();
meanRate = obj.getJsonNumber("mean_rate").doubleValue();
count = obj.getJsonNumber("count").longValue();
}
public void update_fields() {
if (url != null) {
fromJson(c.getJsonObj(url, null, CACHE_DURATION));
}
}
@Override
public void tick() {
set(get_value());
super.tick();
public TimeUnit rateUnit() {
return rateUnit;
}
@Override
public String eventType() {
return eventType;
}
@Override
public long count() {
update_fields();
return count;
}
@Override
public double fifteenMinuteRate() {
update_fields();
return fifteenMinuteRate;
}
@Override
public double fiveMinuteRate() {
update_fields();
return fiveMinuteRate;
}
@Override
public double meanRate() {
update_fields();
return meanRate;
}
@Override
public double oneMinuteRate() {
update_fields();
return oneMinuteRate;
}
}

View File

@ -128,7 +128,7 @@ public class APIMetricsRegistry extends MetricsRegistry {
* the rate unit of the new meter
* @return a new {@link Meter}
*/
public Meter newMeter(String url, Class<?> klass, String name,
public APIMeter newMeter(String url, Class<?> klass, String name,
String eventType, TimeUnit unit) {
return newMeter(url, klass, name, null, eventType, unit);
}
@ -150,7 +150,7 @@ public class APIMetricsRegistry extends MetricsRegistry {
* the rate unit of the new meter
* @return a new {@link Meter}
*/
public Meter newMeter(String url, Class<?> klass, String name,
public APIMeter newMeter(String url, Class<?> klass, String name,
String scope, String eventType, TimeUnit unit) {
return newMeter(url, createName(klass, name, scope), eventType, unit);
}
@ -171,14 +171,14 @@ public class APIMetricsRegistry extends MetricsRegistry {
* the rate unit of the new meter
* @return a new {@link Meter}
*/
public Meter newMeter(String url, MetricName metricName, String eventType,
public APIMeter newMeter(String url, MetricName metricName, String eventType,
TimeUnit unit) {
final Metric existingMetric = getMetrics().get(metricName);
if (existingMetric != null) {
return (Meter) existingMetric;
return (APIMeter) existingMetric;
}
return getOrAdd(metricName, new APIMeter(url, newMeterTickThreadPool(),
eventType, unit, getClock()));
eventType, unit));
}
/**
@ -378,7 +378,7 @@ public class APIMetricsRegistry extends MetricsRegistry {
return (Timer) existingMetric;
}
return getOrAdd(metricName, new APITimer(url, newMeterTickThreadPool(),
durationUnit, rateUnit, getClock()));
durationUnit, rateUnit));
}
}

View File

@ -4,41 +4,127 @@
*/
package com.yammer.metrics.core;
import java.lang.reflect.Field;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.json.JsonObject;
import com.scylladb.jmx.api.APIClient;
import com.yammer.metrics.core.Histogram.SampleType;
import com.yammer.metrics.stats.Snapshot;
/**
* A timer metric which aggregates timing durations and provides duration
* statistics, plus throughput statistics via {@link Meter}.
*/
public class APITimer extends Timer {
public final static long CACHE_DURATION = 1000;
final TimeUnit durationUnit, rateUnit;
final APIMeter meter;
final APIHistogram histogram;
APIClient c = new APIClient();
String url;
public APITimer(String url, ScheduledExecutorService tickThread,
TimeUnit durationUnit, TimeUnit rateUnit) {
super(tickThread, durationUnit, rateUnit);
setHistogram(url);
super.stop();
this.url = url;
this.durationUnit = durationUnit;
this.rateUnit = rateUnit;
meter = new APIMeter(null, tickThread, "calls", rateUnit);
histogram = new APIHistogram(null, SampleType.BIASED);
}
public APITimer(String url, ScheduledExecutorService tickThread,
TimeUnit durationUnit, TimeUnit rateUnit, Clock clock) {
super(tickThread, durationUnit, rateUnit, clock);
setHistogram(url);
public void fromJson(JsonObject obj) {
meter.fromJson(obj.getJsonObject("meter"));
histogram.updateValue(APIClient.json2histogram(obj.getJsonObject("hist")));
}
private void setHistogram(String url) {
Field histogram;
try {
histogram = Timer.class.getDeclaredField("histogram");
histogram.setAccessible(true);
histogram.set(this, new APIHistogram(url, SampleType.BIASED));
} catch (NoSuchFieldException | SecurityException
| IllegalArgumentException | IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
public void update_fields() {
if (url != null) {
fromJson(c.getJsonObj(url, null, CACHE_DURATION));
}
}
@Override
public double max() {
update_fields();
return histogram.max();
}
@Override
public double min() {
update_fields();
return histogram.min();
}
@Override
public double mean() {
update_fields();
return histogram.mean();
}
@Override
public double stdDev() {
update_fields();
return histogram.stdDev();
}
@Override
public double sum() {
update_fields();
return 0;
}
@Override
public Snapshot getSnapshot() {
update_fields();
return histogram.getSnapshot();
}
@Override
public TimeUnit rateUnit() {
update_fields();
return meter.rateUnit();
}
@Override
public String eventType() {
update_fields();
return meter.eventType();
}
@Override
public long count() {
update_fields();
return meter.count();
}
@Override
public double fifteenMinuteRate() {
update_fields();
return meter.fifteenMinuteRate();
}
@Override
public double fiveMinuteRate() {
update_fields();
return meter.fiveMinuteRate();
}
@Override
public double meanRate() {
update_fields();
return meter.meanRate();
}
@Override
public double oneMinuteRate() {
update_fields();
return meter.oneMinuteRate();
}
}

View File

@ -31,7 +31,7 @@ import com.scylladb.jmx.metrics.APIMetrics;
import com.scylladb.jmx.metrics.DefaultNameFactory;
import com.scylladb.jmx.metrics.MetricNameFactory;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.APIMeter;
/**
* Metrics for {@code ICache}.
@ -40,9 +40,9 @@ public class CacheMetrics {
/** Cache capacity in bytes */
public final Gauge<Long> capacity;
/** Total number of cache hits */
public final Meter hits;
public final APIMeter hits;
/** Total number of cache requests */
public final Meter requests;
public final APIMeter requests;
/** cache hit rate */
public final Gauge<Double> hitRate;
/** Total size of cache, in bytes */
@ -55,6 +55,12 @@ public class CacheMetrics {
private APIClient c = new APIClient();
private String getURL(String url, String value) {
if (url == null || value == null) {
return null;
}
return "/cache_service/metrics/" + url + value;
}
/**
* Create metrics for given cache.
*
@ -66,39 +72,50 @@ public class CacheMetrics {
public CacheMetrics(String type, final String url) {
MetricNameFactory factory = new DefaultNameFactory("Cache", type);
capacity = APIMetrics.newGauge(factory.createMetricName("Capacity"),
new Gauge<Long>() {
String u = getURL(url, "/capacity");
public Long value() {
return c.getLongValue("/cache_service/metrics/" + url
+ "/capacity");
if (u == null) {
return 0L;
}
return c.getLongValue(u);
}
});
hits = APIMetrics.newMeter("/cache_service/metrics/" + url
+ "/hits", factory.createMetricName("Hits"), "hits",
hits = APIMetrics.newMeter(getURL(url, "/hits_moving_avrage"), factory.createMetricName("Hits"), "hits",
TimeUnit.SECONDS);
requests = APIMetrics.newMeter("/cache_service/metrics/" + url
+ "/requests", factory.createMetricName("Requests"),
requests = APIMetrics.newMeter(getURL(url, "/requests_moving_avrage"), factory.createMetricName("Requests"),
"requests", TimeUnit.SECONDS);
hitRate = APIMetrics.newGauge(factory.createMetricName("HitRate"),
new Gauge<Double>() {
String u = getURL(url, "/hit_rate");
@Override
public Double value() {
return c.getDoubleValue("/cache_service/metrics/" + url
+ "/hit_rate");
if (u == null) {
return 0.0;
}
return c.getDoubleValue(u);
}
});
size = APIMetrics.newGauge(factory.createMetricName("Size"),
new Gauge<Long>() {
String u = getURL(url, "/size");
public Long value() {
return c.getLongValue("/cache_service/metrics/" + url
+ "/size");
if (u == null) {
return 0L;
}
return c.getLongValue(u);
}
});
entries = APIMetrics.newGauge(factory.createMetricName("Entries"),
new Gauge<Integer>() {
String u = getURL(url, "/entries");
public Integer value() {
return c.getIntValue("/cache_service/metrics/" + url
+ "/entries");
if (u == null) {
return 0;
}
return c.getIntValue(u);
}
});
}

View File

@ -59,12 +59,13 @@ public class ClientRequestMetrics extends LatencyMetrics {
public ClientRequestMetrics(String url, String scope) {
super(url, "ClientRequest", scope);
timeouts = APIMetrics.newMeter(url + "/timeouts",
timeouts = APIMetrics.newMeter(url + "/timeouts_rates",
factory.createMetricName("Timeouts"), "timeouts",
TimeUnit.SECONDS);
unavailables = APIMetrics.newMeter(url + "/unavailables",
unavailables = APIMetrics.newMeter(url + "/unavailables_rates",
factory.createMetricName("Unavailables"), "unavailables",
TimeUnit.SECONDS);
}
public void release() {

View File

@ -18,7 +18,7 @@
/*
* Copyright 2015 Cloudius Systems
*
*
* Modified by Cloudius Systems
*/
package org.apache.cassandra.metrics;
@ -31,7 +31,7 @@ import com.scylladb.jmx.metrics.DefaultNameFactory;
import com.scylladb.jmx.metrics.MetricNameFactory;
import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.APIMeter;
/**
* Metrics for compaction.
@ -45,7 +45,7 @@ public class CompactionMetrics {
/** Number of completed compactions since server [re]start */
public final Gauge<Long> completedTasks;
/** Total number of compactions since server [re]start */
public final Meter totalCompactionsCompleted;
public final APIMeter totalCompactionsCompleted;
/** Total number of bytes compacted since server [re]start */
public final Counter bytesCompacted;

View File

@ -31,21 +31,21 @@ import org.apache.cassandra.net.MessagingService;
import com.scylladb.jmx.metrics.APIMetrics;
import com.scylladb.jmx.metrics.DefaultNameFactory;
import com.scylladb.jmx.metrics.MetricNameFactory;
import com.yammer.metrics.core.APISettableMeter;
import com.yammer.metrics.core.APIMeter;
/**
* Metrics for dropped messages by verb.
*/
public class DroppedMessageMetrics {
/** Number of dropped messages */
public final APISettableMeter dropped;
public final APIMeter dropped;
private long lastDropped = 0;
public DroppedMessageMetrics(MessagingService.Verb verb) {
MetricNameFactory factory = new DefaultNameFactory("DroppedMessage",
verb.toString());
dropped = (APISettableMeter) APIMetrics.newSettableMeter(
dropped = (APIMeter) APIMetrics.newMeter(null,
factory.createMetricName("Dropped"), "dropped",
TimeUnit.SECONDS);
dropped.stop();
@ -59,7 +59,7 @@ public class DroppedMessageMetrics {
return (int) recentlyDropped;
}
public APISettableMeter getMeter() {
public APIMeter getMeter() {
return dropped;
}
}

View File

@ -108,7 +108,7 @@ public class LatencyMetrics {
this.namePrefix = namePrefix;
paramName = (paramName == null)? "" : "/" + paramName;
latency = APIMetrics.newTimer(url + "/histogram" + paramName,
latency = APIMetrics.newTimer(url + "/moving_average_histogram" + paramName,
factory.createMetricName(namePrefix + "Latency"),
TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
totalLatency = APIMetrics.newCounter(url + paramName,
@ -135,12 +135,7 @@ public class LatencyMetrics {
/** takes nanoseconds **/
public void addNano(long nanos) {
// convert to microseconds. 1 millionth
latency.update(nanos, TimeUnit.NANOSECONDS);
totalLatency.inc(nanos / 1000);
for (LatencyMetrics parent : parents) {
parent.addNano(nanos);
}
// the object is only updated from the API
}
public void release() {

View File

@ -31,12 +31,10 @@ import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.ws.rs.ProcessingException;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
import com.scylladb.jmx.api.APIClient;
import com.yammer.metrics.core.APISettableMeter;
public final class MessagingService implements MessagingServiceMBean {
static final int INTERVAL = 1000; // update every 1second
@ -101,56 +99,26 @@ public final class MessagingService implements MessagingServiceMBean {
logger.finest(str);
}
private static Timer timer = new Timer("Dropped messages");
public MessagingService() {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
jmxObjectName = new ObjectName(MBEAN_NAME);
mbs.registerMBean(this, jmxObjectName);
// mbs.registerMBean(StreamManager.instance, new ObjectName(
// StreamManager.OBJECT_NAME));
dropped = new HashMap<String, DroppedMessageMetrics>();
for (Verb v : Verb.values()) {
dropped.put(v.name(), new DroppedMessageMetrics(v));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
timer.schedule(new CheckDroppedMessages(), INTERVAL, INTERVAL);
}
static MessagingService instance = new MessagingService();
private static final class CheckDroppedMessages extends TimerTask {
int connection_failure = 0;
int report_error = 1;
@Override
public void run() {
if (instance.dropped == null) {
instance.dropped = new HashMap<String, DroppedMessageMetrics>();
for (Verb v : Verb.values()) {
instance.dropped.put(v.name(), new DroppedMessageMetrics(v));
}
}
try {
Map<String, Integer> val = instance.getDroppedMessages();
for (String k : val.keySet()) {
APISettableMeter meter = instance.dropped.get(k).getMeter();
meter.set(val.get(k));
meter.tick();
}
connection_failure = 0;
report_error = 1;
} catch (IllegalStateException e) {
// Connection problem, No need to do anything, just retry.
} catch (Exception e) {
connection_failure++;
if ((connection_failure & report_error) == report_error) {
logger.info("Dropped messages failed with " + e.getMessage() + " total error reported " + connection_failure);
report_error <<= 1;
}
}
}
}
static MessagingService instance;
public static MessagingService getInstance() {
if (instance == null) {
instance = new MessagingService();
}
return instance;
}

View File

@ -65,9 +65,9 @@ public class CacheService implements CacheServiceMBean {
throw new RuntimeException(e);
}
keyCache = new CacheMetrics("KeyCache", "key");
keyCache = new CacheMetrics("KeyCache", null);
rowCache = new CacheMetrics("RowCache", "row");
counterCache = new CacheMetrics("CounterCache", "counter");
counterCache = new CacheMetrics("CounterCache", null);
}
public int getRowCacheSavePeriodInSeconds() {