scylla-jmx: Use registration checker objects
Fixes #134 Refs #135 Replaces previous refresh calls with ones bound to registration check objects, which provides some sync between threads doing refresh, and reduced redundant calls. Also adds repeated reaping of dead objects, i.e. every 5 minutes we try to remove dead CF:s (not adding new ones), to reduce idle footprint.
This commit is contained in:
parent
771fe3e360
commit
ba3f58c63c
|
@ -1,6 +1,7 @@
|
||||||
package com.scylladb.jmx.metrics;
|
package com.scylladb.jmx.metrics;
|
||||||
|
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -55,14 +56,14 @@ public class APIMBean implements MBeanRegistration {
|
||||||
* @param generator
|
* @param generator
|
||||||
* {@link Function} to create a new MBean instance for a given
|
* {@link Function} to create a new MBean instance for a given
|
||||||
* {@link ObjectName}
|
* {@link ObjectName}
|
||||||
*
|
|
||||||
* @return
|
* @return
|
||||||
* @throws MalformedObjectNameException
|
* @throws MalformedObjectNameException
|
||||||
*/
|
*/
|
||||||
public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all,
|
public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all,
|
||||||
final Predicate<ObjectName> predicate, Function<ObjectName, Object> generator)
|
EnumSet<RegistrationMode> mode, final Predicate<ObjectName> predicate,
|
||||||
throws MalformedObjectNameException {
|
Function<ObjectName, Object> generator) throws MalformedObjectNameException {
|
||||||
Set<ObjectName> registered = queryNames(server, predicate);
|
Set<ObjectName> registered = queryNames(server, predicate);
|
||||||
|
if (mode.contains(RegistrationMode.Remove)) {
|
||||||
for (ObjectName name : registered) {
|
for (ObjectName name : registered) {
|
||||||
if (!all.contains(name)) {
|
if (!all.contains(name)) {
|
||||||
try {
|
try {
|
||||||
|
@ -71,14 +72,18 @@ public class APIMBean implements MBeanRegistration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int added = 0;
|
int added = 0;
|
||||||
|
if (mode.contains(RegistrationMode.Add)) {
|
||||||
for (ObjectName name : all) {
|
for (ObjectName name : all) {
|
||||||
if (!registered.contains(name)) {
|
if (!registered.contains(name)) {
|
||||||
try {
|
try {
|
||||||
server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name);
|
server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name);
|
||||||
added++;
|
added++;
|
||||||
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) {
|
} catch (InstanceAlreadyExistsException | MBeanRegistrationException
|
||||||
|
| NotCompliantMBeanException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
package com.scylladb.jmx.utils;
|
package com.scylladb.jmx.utils;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static java.util.concurrent.Executors.newScheduledThreadPool;
|
||||||
|
import static java.util.concurrent.TimeUnit.MINUTES;
|
||||||
|
|
||||||
import java.io.ObjectInputStream;
|
import java.io.ObjectInputStream;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -34,12 +39,17 @@ import org.apache.cassandra.db.ColumnFamilyStore;
|
||||||
import org.apache.cassandra.metrics.StreamingMetrics;
|
import org.apache.cassandra.metrics.StreamingMetrics;
|
||||||
|
|
||||||
import com.scylladb.jmx.api.APIClient;
|
import com.scylladb.jmx.api.APIClient;
|
||||||
|
import com.scylladb.jmx.metrics.RegistrationChecker;
|
||||||
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
||||||
|
|
||||||
@SuppressWarnings("restriction")
|
@SuppressWarnings("restriction")
|
||||||
public class APIMBeanServer implements MBeanServer {
|
public class APIMBeanServer implements MBeanServer {
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName());
|
private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName());
|
||||||
|
private static final ScheduledExecutorService executor = newScheduledThreadPool(1);
|
||||||
|
|
||||||
|
private final RegistrationChecker columnFamilyStoreChecker = ColumnFamilyStore.createRegistrationChecker();
|
||||||
|
private final RegistrationChecker streamingMetricsChecker = StreamingMetrics.createRegistrationChecker();
|
||||||
|
|
||||||
private final APIClient client;
|
private final APIClient client;
|
||||||
private final JmxMBeanServer server;
|
private final JmxMBeanServer server;
|
||||||
|
@ -47,6 +57,16 @@ public class APIMBeanServer implements MBeanServer {
|
||||||
public APIMBeanServer(APIClient client, JmxMBeanServer server) {
|
public APIMBeanServer(APIClient client, JmxMBeanServer server) {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
|
executor.scheduleWithFixedDelay(() -> {
|
||||||
|
for (RegistrationChecker c : asList(columnFamilyStoreChecker, streamingMetricsChecker)) {
|
||||||
|
try {
|
||||||
|
c.reap(client, server);
|
||||||
|
} catch (OperationsException | UnknownHostException e) {
|
||||||
|
// TODO: log?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 1, 5, MINUTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ObjectInstance prepareForRemote(final ObjectInstance i) {
|
private static ObjectInstance prepareForRemote(final ObjectInstance i) {
|
||||||
|
@ -287,24 +307,21 @@ public class APIMBeanServer implements MBeanServer {
|
||||||
|
|
||||||
static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$");
|
static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$");
|
||||||
|
|
||||||
private boolean checkRegistrations(ObjectName name) {
|
private void checkRegistrations(ObjectName name) {
|
||||||
if (name != null && server.isRegistered(name)) {
|
if (name != null && server.isRegistered(name)) {
|
||||||
return false;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean result = false;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
String type = name != null ? name.getKeyProperty("type") : null;
|
String type = name != null ? name.getKeyProperty("type") : null;
|
||||||
if (type == null || tables.matcher(type).matches()) {
|
if (type == null || tables.matcher(type).matches()) {
|
||||||
result |= ColumnFamilyStore.checkRegistration(client, server);
|
columnFamilyStoreChecker.check(client, server);
|
||||||
}
|
}
|
||||||
if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) {
|
if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) {
|
||||||
result |= StreamingMetrics.checkRegistration(client, server);
|
streamingMetricsChecker.check(client, server);
|
||||||
}
|
}
|
||||||
} catch (MalformedObjectNameException | UnknownHostException e) {
|
} catch (OperationsException | UnknownHostException e) {
|
||||||
// TODO: log
|
// TODO: log
|
||||||
}
|
}
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -30,6 +30,7 @@ import static javax.json.Json.createObjectBuilder;
|
||||||
|
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -50,6 +51,7 @@ import javax.json.JsonReader;
|
||||||
import javax.management.MBeanServer;
|
import javax.management.MBeanServer;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.OperationsException;
|
||||||
import javax.management.openmbean.CompositeData;
|
import javax.management.openmbean.CompositeData;
|
||||||
import javax.management.openmbean.CompositeDataSupport;
|
import javax.management.openmbean.CompositeDataSupport;
|
||||||
import javax.management.openmbean.CompositeType;
|
import javax.management.openmbean.CompositeType;
|
||||||
|
@ -65,6 +67,8 @@ import org.apache.cassandra.metrics.TableMetrics;
|
||||||
|
|
||||||
import com.scylladb.jmx.api.APIClient;
|
import com.scylladb.jmx.api.APIClient;
|
||||||
import com.scylladb.jmx.metrics.MetricsMBean;
|
import com.scylladb.jmx.metrics.MetricsMBean;
|
||||||
|
import com.scylladb.jmx.metrics.RegistrationChecker;
|
||||||
|
import com.scylladb.jmx.metrics.RegistrationMode;
|
||||||
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
|
|
||||||
|
@ -182,14 +186,21 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore
|
||||||
"org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name);
|
"org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean checkRegistration(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException {
|
public static RegistrationChecker createRegistrationChecker() {
|
||||||
|
return new RegistrationChecker() {
|
||||||
|
@Override
|
||||||
|
protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode)
|
||||||
|
throws OperationsException {
|
||||||
JsonArray mbeans = client.getJsonArray("/column_family/");
|
JsonArray mbeans = client.getJsonArray("/column_family/");
|
||||||
Set<ObjectName> all = new HashSet<ObjectName>();
|
Set<ObjectName> all = new HashSet<ObjectName>();
|
||||||
for (int i = 0; i < mbeans.size(); i++) {
|
for (int i = 0; i < mbeans.size(); i++) {
|
||||||
JsonObject mbean = mbeans.getJsonObject(i);
|
JsonObject mbean = mbeans.getJsonObject(i);
|
||||||
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf")));
|
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf")));
|
||||||
}
|
}
|
||||||
return checkRegistration(server, all, n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n));
|
checkRegistration(server, all, mode,
|
||||||
|
n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n));
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,20 +24,23 @@
|
||||||
package org.apache.cassandra.metrics;
|
package org.apache.cassandra.metrics;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static java.util.Collections.emptySet;
|
|
||||||
import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName;
|
import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName;
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.json.JsonArray;
|
import javax.json.JsonArray;
|
||||||
import javax.management.MalformedObjectNameException;
|
import javax.management.MalformedObjectNameException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
import javax.management.OperationsException;
|
||||||
|
|
||||||
import com.scylladb.jmx.api.APIClient;
|
import com.scylladb.jmx.api.APIClient;
|
||||||
import com.scylladb.jmx.metrics.APIMBean;
|
import com.scylladb.jmx.metrics.APIMBean;
|
||||||
|
import com.scylladb.jmx.metrics.RegistrationChecker;
|
||||||
|
import com.scylladb.jmx.metrics.RegistrationMode;
|
||||||
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
import com.sun.jmx.mbeanserver.JmxMBeanServer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,13 +68,10 @@ public class StreamingMetrics {
|
||||||
return TYPE_NAME.equals(n.getKeyProperty("type"));
|
return TYPE_NAME.equals(n.getKeyProperty("type"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void unregister(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException {
|
public static RegistrationChecker createRegistrationChecker() {
|
||||||
APIMBean.checkRegistration(server, emptySet(), StreamingMetrics::isStreamingName, (n) -> null);
|
return new RegistrationChecker() {
|
||||||
}
|
@Override
|
||||||
|
protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode) throws OperationsException, UnknownHostException {
|
||||||
public static boolean checkRegistration(APIClient client, JmxMBeanServer server)
|
|
||||||
throws MalformedObjectNameException, UnknownHostException {
|
|
||||||
|
|
||||||
Set<ObjectName> all = new HashSet<ObjectName>(globalNames);
|
Set<ObjectName> all = new HashSet<ObjectName>(globalNames);
|
||||||
JsonArray streams = client.getJsonArray("/stream_manager/");
|
JsonArray streams = client.getJsonArray("/stream_manager/");
|
||||||
for (int i = 0; i < streams.size(); i++) {
|
for (int i = 0; i < streams.size(); i++) {
|
||||||
|
@ -85,7 +85,7 @@ public class StreamingMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
MetricsRegistry registry = new MetricsRegistry(client, server);
|
MetricsRegistry registry = new MetricsRegistry(client, server);
|
||||||
return APIMBean.checkRegistration(server, all, StreamingMetrics::isStreamingName, n -> {
|
APIMBean.checkRegistration(server, all, mode, StreamingMetrics::isStreamingName, n -> {
|
||||||
String scope = n.getKeyProperty("scope");
|
String scope = n.getKeyProperty("scope");
|
||||||
String name = n.getKeyProperty("name");
|
String name = n.getKeyProperty("name");
|
||||||
|
|
||||||
|
@ -106,4 +106,6 @@ public class StreamingMetrics {
|
||||||
return registry.counter(url);
|
return registry.counter(url);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user