Merge 'JMX footprint work' from Calle

"
Fixes #133
Fixes #134
Refs #135

Makes CF mbean refresh code synchronized and tries to remove reductant
calls if we contend. Adds background reaping of dead objects to reduce
memory load in (test) scenarios where we manage to refresh to add, but
not cause removal (i.e. no wildcard queries).

TableMetricsObjectName serialization is fixed in the series because
without it we see loads of exceptions when refreshing the mbean set.
"

* elcallio-jmx-fixes:
  scylla-jmx: Use registration checker objects
  scylla-jmx: Introduce a registration check object
  scylla-jmx: Fix TableMetricObjectName serialization
This commit is contained in:
Pekka Enberg 2020-09-07 13:54:56 +03:00
commit 8d92e5450e
7 changed files with 209 additions and 87 deletions

View File

@ -1,6 +1,7 @@
package com.scylladb.jmx.metrics; package com.scylladb.jmx.metrics;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -55,35 +56,39 @@ public class APIMBean implements MBeanRegistration {
* @param generator * @param generator
* {@link Function} to create a new MBean instance for a given * {@link Function} to create a new MBean instance for a given
* {@link ObjectName} * {@link ObjectName}
*
* @return * @return
* @throws MalformedObjectNameException * @throws MalformedObjectNameException
*/ */
public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all, public static boolean checkRegistration(JmxMBeanServer server, Set<ObjectName> all,
final Predicate<ObjectName> predicate, Function<ObjectName, Object> generator) EnumSet<RegistrationMode> mode, final Predicate<ObjectName> predicate,
throws MalformedObjectNameException { Function<ObjectName, Object> generator) throws MalformedObjectNameException {
Set<ObjectName> registered = queryNames(server, predicate); Set<ObjectName> registered = queryNames(server, predicate);
for (ObjectName name : registered) { if (mode.contains(RegistrationMode.Remove)) {
if (!all.contains(name)) { for (ObjectName name : registered) {
try { if (!all.contains(name)) {
server.getMBeanServerInterceptor().unregisterMBean(name); try {
} catch (MBeanRegistrationException | InstanceNotFoundException e) { server.getMBeanServerInterceptor().unregisterMBean(name);
} } catch (MBeanRegistrationException | InstanceNotFoundException e) {
} }
} }
}
}
int added = 0; int added = 0;
for (ObjectName name : all) { if (mode.contains(RegistrationMode.Add)) {
if (!registered.contains(name)) { for (ObjectName name : all) {
try { if (!registered.contains(name)) {
server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name); try {
added++; server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name);
} catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { added++;
} } catch (InstanceAlreadyExistsException | MBeanRegistrationException
} | NotCompliantMBeanException e) {
} }
return added > 0; }
} }
}
return added > 0;
}
/** /**
* Helper method to query {@link ObjectName}s from an {@link MBeanServer} * Helper method to query {@link ObjectName}s from an {@link MBeanServer}

View File

@ -0,0 +1,69 @@
package com.scylladb.jmx.metrics;
import static com.scylladb.jmx.metrics.RegistrationMode.Remove;
import static com.scylladb.jmx.metrics.RegistrationMode.Wait;
import static java.util.EnumSet.allOf;
import static java.util.EnumSet.of;
import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.OperationsException;
import com.scylladb.jmx.api.APIClient;
import com.sun.jmx.mbeanserver.JmxMBeanServer;
/**
* Helper type to do optional locking for registration. Allows for
* per-bind-point locks and registration, instead of per-type or per-instance
* locks which may be misguiding, since for example one instance can be bound to
* many MBeanServers etc.
*
* Also allows for polled checks, i.e. try-lock and either wait or skip. Wait,
* because we probably should not repeat things hidden by this type too often,
* and skip because for example a periodic task checking can just skip if a
* user-initiated registration check is being done.
*
* @author calle
*
*/
@SuppressWarnings("restriction")
public abstract class RegistrationChecker {
private final Lock lock = new ReentrantLock();
public static final EnumSet<RegistrationMode> REMOVE_NO_WAIT = of(Remove);
public static final EnumSet<RegistrationMode> ADD_AND_REMOVE = allOf(RegistrationMode.class);
public final void reap(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException {
check(client, server, REMOVE_NO_WAIT);
}
public final void check(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException {
check(client, server, ADD_AND_REMOVE);
}
public final void check(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode)
throws OperationsException, UnknownHostException {
if (!lock.tryLock()) {
if (mode.contains(Wait)) {
// someone is doing update.
// since this is jmx, and sloppy, we'll just
// assume that once he is done, things are
// good enough.
lock.lock();
lock.unlock();
}
return;
}
try {
doCheck(client, server, mode);
} finally {
lock.unlock();
}
}
protected abstract void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode)
throws OperationsException, UnknownHostException;
}

View File

@ -0,0 +1,5 @@
package com.scylladb.jmx.metrics;
public enum RegistrationMode {
Wait, Add, Remove,
}

View File

@ -1,8 +1,13 @@
package com.scylladb.jmx.utils; package com.scylladb.jmx.utils;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -34,12 +39,17 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.metrics.StreamingMetrics;
import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.sun.jmx.mbeanserver.JmxMBeanServer; import com.sun.jmx.mbeanserver.JmxMBeanServer;
@SuppressWarnings("restriction") @SuppressWarnings("restriction")
public class APIMBeanServer implements MBeanServer { public class APIMBeanServer implements MBeanServer {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName()); private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName());
private static final ScheduledExecutorService executor = newScheduledThreadPool(1);
private final RegistrationChecker columnFamilyStoreChecker = ColumnFamilyStore.createRegistrationChecker();
private final RegistrationChecker streamingMetricsChecker = StreamingMetrics.createRegistrationChecker();
private final APIClient client; private final APIClient client;
private final JmxMBeanServer server; private final JmxMBeanServer server;
@ -47,6 +57,16 @@ public class APIMBeanServer implements MBeanServer {
public APIMBeanServer(APIClient client, JmxMBeanServer server) { public APIMBeanServer(APIClient client, JmxMBeanServer server) {
this.client = client; this.client = client;
this.server = server; this.server = server;
executor.scheduleWithFixedDelay(() -> {
for (RegistrationChecker c : asList(columnFamilyStoreChecker, streamingMetricsChecker)) {
try {
c.reap(client, server);
} catch (OperationsException | UnknownHostException e) {
// TODO: log?
}
}
}, 1, 5, MINUTES);
} }
private static ObjectInstance prepareForRemote(final ObjectInstance i) { private static ObjectInstance prepareForRemote(final ObjectInstance i) {
@ -287,24 +307,21 @@ public class APIMBeanServer implements MBeanServer {
static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$"); static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$");
private boolean checkRegistrations(ObjectName name) { private void checkRegistrations(ObjectName name) {
if (name != null && server.isRegistered(name)) { if (name != null && server.isRegistered(name)) {
return false; return;
} }
boolean result = false;
try { try {
String type = name != null ? name.getKeyProperty("type") : null; String type = name != null ? name.getKeyProperty("type") : null;
if (type == null || tables.matcher(type).matches()) { if (type == null || tables.matcher(type).matches()) {
result |= ColumnFamilyStore.checkRegistration(client, server); columnFamilyStoreChecker.check(client, server);
} }
if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) { if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) {
result |= StreamingMetrics.checkRegistration(client, server); streamingMetricsChecker.check(client, server);
} }
} catch (MalformedObjectNameException | UnknownHostException e) { } catch (OperationsException | UnknownHostException e) {
// TODO: log // TODO: log
} }
return result;
} }
} }

View File

@ -30,6 +30,7 @@ import static javax.json.Json.createObjectBuilder;
import java.io.StringReader; import java.io.StringReader;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -50,6 +51,7 @@ import javax.json.JsonReader;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType; import javax.management.openmbean.CompositeType;
@ -65,6 +67,8 @@ import org.apache.cassandra.metrics.TableMetrics;
import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.MetricsMBean; import com.scylladb.jmx.metrics.MetricsMBean;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.scylladb.jmx.metrics.RegistrationMode;
import com.sun.jmx.mbeanserver.JmxMBeanServer; import com.sun.jmx.mbeanserver.JmxMBeanServer;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
@ -182,15 +186,22 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore
"org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name); "org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name);
} }
public static boolean checkRegistration(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { public static RegistrationChecker createRegistrationChecker() {
JsonArray mbeans = client.getJsonArray("/column_family/"); return new RegistrationChecker() {
Set<ObjectName> all = new HashSet<ObjectName>(); @Override
for (int i = 0; i < mbeans.size(); i++) { protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode)
JsonObject mbean = mbeans.getJsonObject(i); throws OperationsException {
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf"))); JsonArray mbeans = client.getJsonArray("/column_family/");
} Set<ObjectName> all = new HashSet<ObjectName>();
return checkRegistration(server, all, n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n)); for (int i = 0; i < mbeans.size(); i++) {
} JsonObject mbean = mbeans.getJsonObject(i);
all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf")));
}
checkRegistration(server, all, mode,
n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n));
}
};
}
/** /**
* @return the name of the column family * @return the name of the column family

View File

@ -24,20 +24,23 @@
package org.apache.cassandra.metrics; package org.apache.cassandra.metrics;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName; import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import javax.json.JsonArray; import javax.json.JsonArray;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
import javax.management.ObjectName; import javax.management.ObjectName;
import javax.management.OperationsException;
import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.api.APIClient;
import com.scylladb.jmx.metrics.APIMBean; import com.scylladb.jmx.metrics.APIMBean;
import com.scylladb.jmx.metrics.RegistrationChecker;
import com.scylladb.jmx.metrics.RegistrationMode;
import com.sun.jmx.mbeanserver.JmxMBeanServer; import com.sun.jmx.mbeanserver.JmxMBeanServer;
/** /**
@ -65,45 +68,44 @@ public class StreamingMetrics {
return TYPE_NAME.equals(n.getKeyProperty("type")); return TYPE_NAME.equals(n.getKeyProperty("type"));
} }
public static void unregister(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { public static RegistrationChecker createRegistrationChecker() {
APIMBean.checkRegistration(server, emptySet(), StreamingMetrics::isStreamingName, (n) -> null); return new RegistrationChecker() {
} @Override
protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet<RegistrationMode> mode) throws OperationsException, UnknownHostException {
Set<ObjectName> all = new HashSet<ObjectName>(globalNames);
JsonArray streams = client.getJsonArray("/stream_manager/");
for (int i = 0; i < streams.size(); i++) {
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
for (int j = 0; j < sessions.size(); j++) {
String peer = sessions.getJsonObject(j).getString("peer");
String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", ".");
all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope));
all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope));
}
}
public static boolean checkRegistration(APIClient client, JmxMBeanServer server) MetricsRegistry registry = new MetricsRegistry(client, server);
throws MalformedObjectNameException, UnknownHostException { APIMBean.checkRegistration(server, all, mode, StreamingMetrics::isStreamingName, n -> {
String scope = n.getKeyProperty("scope");
String name = n.getKeyProperty("name");
Set<ObjectName> all = new HashSet<ObjectName>(globalNames); String url = null;
JsonArray streams = client.getJsonArray("/stream_manager/"); if ("ActiveOutboundStreams".equals(name)) {
for (int i = 0; i < streams.size(); i++) { url = "/stream_manager/metrics/outbound";
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); } else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) {
for (int j = 0; j < sessions.size(); j++) { url = "/stream_manager/metrics/incoming";
String peer = sessions.getJsonObject(j).getString("peer"); } else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) {
String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", "."); url = "/stream_manager/metrics/outgoing";
all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope)); }
all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope)); if (url == null) {
} throw new IllegalArgumentException();
} }
if (scope != null) {
MetricsRegistry registry = new MetricsRegistry(client, server); url = url + "/" + scope;
return APIMBean.checkRegistration(server, all, StreamingMetrics::isStreamingName, n -> { }
String scope = n.getKeyProperty("scope"); return registry.counter(url);
String name = n.getKeyProperty("name"); });
}
String url = null; };
if ("ActiveOutboundStreams".equals(name)) { }
url = "/stream_manager/metrics/outbound";
} else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) {
url = "/stream_manager/metrics/incoming";
} else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) {
url = "/stream_manager/metrics/outgoing";
}
if (url == null) {
throw new IllegalArgumentException();
}
if (scope != null) {
url = url + "/" + scope;
}
return registry.counter(url);
});
}
} }

View File

@ -19,6 +19,8 @@ package org.apache.cassandra.metrics;
import static com.scylladb.jmx.api.APIClient.getReader; import static com.scylladb.jmx.api.APIClient.getReader;
import java.io.InvalidObjectException;
import java.io.ObjectStreamException;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
@ -295,7 +297,6 @@ public class TableMetrics implements Metrics {
registry.createDummyTableGauge(Double.class, "PercentRepaired"); registry.createDummyTableGauge(Double.class, "PercentRepaired");
} }
@SuppressWarnings("serial")
static class TableMetricObjectName extends javax.management.ObjectName { static class TableMetricObjectName extends javax.management.ObjectName {
private final TableMetricStringNameFactory factory; private final TableMetricStringNameFactory factory;
private final String metricName; private final String metricName;
@ -400,6 +401,18 @@ public class TableMetrics implements Metrics {
public boolean isPropertyValuePattern() { public boolean isPropertyValuePattern() {
return false; return false;
} }
/**
* This type is not really serializable.
* Replace it with vanilla objectname.
*/
private Object writeReplace() throws ObjectStreamException {
try {
return new ObjectName(getDomain(), getKeyPropertyList());
} catch (MalformedObjectNameException e) {
throw new InvalidObjectException(toString());
}
}
} }
static interface TableMetricStringNameFactory { static interface TableMetricStringNameFactory {