From 1219faf9f17ef77e1db4385665f5225fb6d7e819 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 1 Sep 2020 15:33:04 +0200 Subject: [PATCH 1/3] scylla-jmx: Fix TableMetricObjectName serialization Fixes #133 TableMetricObjectName is not serializable as such, since it depends on a lexicon object etc. Use writeReplace to put a regular ObjectName in the stream instead. --- .../apache/cassandra/metrics/TableMetrics.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/cassandra/metrics/TableMetrics.java b/src/main/java/org/apache/cassandra/metrics/TableMetrics.java index 00af96c..9b83d2b 100644 --- a/src/main/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/main/java/org/apache/cassandra/metrics/TableMetrics.java @@ -19,6 +19,8 @@ package org.apache.cassandra.metrics; import static com.scylladb.jmx.api.APIClient.getReader; +import java.io.InvalidObjectException; +import java.io.ObjectStreamException; import java.util.Hashtable; import java.util.function.BiFunction; import java.util.function.Function; @@ -295,7 +297,6 @@ public class TableMetrics implements Metrics { registry.createDummyTableGauge(Double.class, "PercentRepaired"); } - @SuppressWarnings("serial") static class TableMetricObjectName extends javax.management.ObjectName { private final TableMetricStringNameFactory factory; private final String metricName; @@ -400,6 +401,18 @@ public class TableMetrics implements Metrics { public boolean isPropertyValuePattern() { return false; } + + /** + * This type is not really serializable. + * Replace it with vanilla objectname. + */ + private Object writeReplace() throws ObjectStreamException { + try { + return new ObjectName(getDomain(), getKeyPropertyList()); + } catch (MalformedObjectNameException e) { + throw new InvalidObjectException(toString()); + } + } } static interface TableMetricStringNameFactory { From 771fe3e36056866661a89e53100673791a7229b5 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 1 Sep 2020 15:34:14 +0200 Subject: [PATCH 2/3] scylla-jmx: Introduce a registration check object Allows for shared code for synchronized and optionally partial update checks. --- .../jmx/metrics/RegistrationChecker.java | 69 +++++++++++++++++++ .../jmx/metrics/RegistrationMode.java | 5 ++ 2 files changed, 74 insertions(+) create mode 100644 src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java create mode 100644 src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java diff --git a/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java b/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java new file mode 100644 index 0000000..fde260f --- /dev/null +++ b/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java @@ -0,0 +1,69 @@ +package com.scylladb.jmx.metrics; + +import static com.scylladb.jmx.metrics.RegistrationMode.Remove; +import static com.scylladb.jmx.metrics.RegistrationMode.Wait; +import static java.util.EnumSet.allOf; +import static java.util.EnumSet.of; + +import java.net.UnknownHostException; +import java.util.EnumSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.management.OperationsException; + +import com.scylladb.jmx.api.APIClient; +import com.sun.jmx.mbeanserver.JmxMBeanServer; + +/** + * Helper type to do optional locking for registration. Allows for + * per-bind-point locks and registration, instead of per-type or per-instance + * locks which may be misguiding, since for example one instance can be bound to + * many MBeanServers etc. + * + * Also allows for polled checks, i.e. try-lock and either wait or skip. Wait, + * because we probably should not repeat things hidden by this type too often, + * and skip because for example a periodic task checking can just skip if a + * user-initiated registration check is being done. + * + * @author calle + * + */ +@SuppressWarnings("restriction") +public abstract class RegistrationChecker { + private final Lock lock = new ReentrantLock(); + + public static final EnumSet REMOVE_NO_WAIT = of(Remove); + public static final EnumSet ADD_AND_REMOVE = allOf(RegistrationMode.class); + + public final void reap(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException { + check(client, server, REMOVE_NO_WAIT); + } + + public final void check(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException { + check(client, server, ADD_AND_REMOVE); + } + + public final void check(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException, UnknownHostException { + if (!lock.tryLock()) { + if (mode.contains(Wait)) { + // someone is doing update. + // since this is jmx, and sloppy, we'll just + // assume that once he is done, things are + // good enough. + lock.lock(); + lock.unlock(); + } + return; + } + try { + doCheck(client, server, mode); + } finally { + lock.unlock(); + } + } + + protected abstract void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException, UnknownHostException; +} diff --git a/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java b/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java new file mode 100644 index 0000000..773cde4 --- /dev/null +++ b/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java @@ -0,0 +1,5 @@ +package com.scylladb.jmx.metrics; + +public enum RegistrationMode { + Wait, Add, Remove, +} \ No newline at end of file From ba3f58c63cb2ef64679a6318aa8bf3f8810733cc Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 1 Sep 2020 15:45:45 +0200 Subject: [PATCH 3/3] scylla-jmx: Use registration checker objects Fixes #134 Refs #135 Replaces previous refresh calls with ones bound to registration check objects, which provides some sync between threads doing refresh, and reduced redundant calls. Also adds repeated reaping of dead objects, i.e. every 5 minutes we try to remove dead CF:s (not adding new ones), to reduce idle footprint. --- .../com/scylladb/jmx/metrics/APIMBean.java | 55 ++++++------ .../scylladb/jmx/utils/APIMBeanServer.java | 39 ++++++--- .../cassandra/db/ColumnFamilyStore.java | 29 +++++-- .../cassandra/metrics/StreamingMetrics.java | 84 ++++++++++--------- 4 files changed, 121 insertions(+), 86 deletions(-) diff --git a/src/main/java/com/scylladb/jmx/metrics/APIMBean.java b/src/main/java/com/scylladb/jmx/metrics/APIMBean.java index 8eb8a01..ad95e2a 100644 --- a/src/main/java/com/scylladb/jmx/metrics/APIMBean.java +++ b/src/main/java/com/scylladb/jmx/metrics/APIMBean.java @@ -1,6 +1,7 @@ package com.scylladb.jmx.metrics; import java.lang.reflect.Field; +import java.util.EnumSet; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; @@ -55,35 +56,39 @@ public class APIMBean implements MBeanRegistration { * @param generator * {@link Function} to create a new MBean instance for a given * {@link ObjectName} - * * @return * @throws MalformedObjectNameException */ - public static boolean checkRegistration(JmxMBeanServer server, Set all, - final Predicate predicate, Function generator) - throws MalformedObjectNameException { - Set registered = queryNames(server, predicate); - for (ObjectName name : registered) { - if (!all.contains(name)) { - try { - server.getMBeanServerInterceptor().unregisterMBean(name); - } catch (MBeanRegistrationException | InstanceNotFoundException e) { - } - } - } + public static boolean checkRegistration(JmxMBeanServer server, Set all, + EnumSet mode, final Predicate predicate, + Function generator) throws MalformedObjectNameException { + Set registered = queryNames(server, predicate); + if (mode.contains(RegistrationMode.Remove)) { + for (ObjectName name : registered) { + if (!all.contains(name)) { + try { + server.getMBeanServerInterceptor().unregisterMBean(name); + } catch (MBeanRegistrationException | InstanceNotFoundException e) { + } + } + } + } - int added = 0; - for (ObjectName name : all) { - if (!registered.contains(name)) { - try { - server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name); - added++; - } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - } - } - } - return added > 0; - } + int added = 0; + if (mode.contains(RegistrationMode.Add)) { + for (ObjectName name : all) { + if (!registered.contains(name)) { + try { + server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name); + added++; + } catch (InstanceAlreadyExistsException | MBeanRegistrationException + | NotCompliantMBeanException e) { + } + } + } + } + return added > 0; + } /** * Helper method to query {@link ObjectName}s from an {@link MBeanServer} diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java index 4f7c8c5..cbbad3e 100644 --- a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java +++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java @@ -1,8 +1,13 @@ package com.scylladb.jmx.utils; +import static java.util.Arrays.asList; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.TimeUnit.MINUTES; + import java.io.ObjectInputStream; import java.net.UnknownHostException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,12 +39,17 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.metrics.StreamingMetrics; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.RegistrationChecker; import com.sun.jmx.mbeanserver.JmxMBeanServer; @SuppressWarnings("restriction") public class APIMBeanServer implements MBeanServer { @SuppressWarnings("unused") private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName()); + private static final ScheduledExecutorService executor = newScheduledThreadPool(1); + + private final RegistrationChecker columnFamilyStoreChecker = ColumnFamilyStore.createRegistrationChecker(); + private final RegistrationChecker streamingMetricsChecker = StreamingMetrics.createRegistrationChecker(); private final APIClient client; private final JmxMBeanServer server; @@ -47,6 +57,16 @@ public class APIMBeanServer implements MBeanServer { public APIMBeanServer(APIClient client, JmxMBeanServer server) { this.client = client; this.server = server; + + executor.scheduleWithFixedDelay(() -> { + for (RegistrationChecker c : asList(columnFamilyStoreChecker, streamingMetricsChecker)) { + try { + c.reap(client, server); + } catch (OperationsException | UnknownHostException e) { + // TODO: log? + } + } + }, 1, 5, MINUTES); } private static ObjectInstance prepareForRemote(final ObjectInstance i) { @@ -65,7 +85,7 @@ public class APIMBeanServer implements MBeanServer { throw new IllegalArgumentException(n.toString()); } } - + @Override public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException { @@ -286,25 +306,22 @@ public class APIMBeanServer implements MBeanServer { } static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$"); - - private boolean checkRegistrations(ObjectName name) { + + private void checkRegistrations(ObjectName name) { if (name != null && server.isRegistered(name)) { - return false; + return; } - - boolean result = false; - + try { String type = name != null ? name.getKeyProperty("type") : null; if (type == null || tables.matcher(type).matches()) { - result |= ColumnFamilyStore.checkRegistration(client, server); + columnFamilyStoreChecker.check(client, server); } if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) { - result |= StreamingMetrics.checkRegistration(client, server); + streamingMetricsChecker.check(client, server); } - } catch (MalformedObjectNameException | UnknownHostException e) { + } catch (OperationsException | UnknownHostException e) { // TODO: log } - return result; } } \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java index 92b12ae..0833f02 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,6 +30,7 @@ import static javax.json.Json.createObjectBuilder; import java.io.StringReader; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,6 +51,7 @@ import javax.json.JsonReader; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.OperationsException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeType; @@ -65,6 +67,8 @@ import org.apache.cassandra.metrics.TableMetrics; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; +import com.scylladb.jmx.metrics.RegistrationChecker; +import com.scylladb.jmx.metrics.RegistrationMode; import com.sun.jmx.mbeanserver.JmxMBeanServer; import com.google.common.base.Throwables; @@ -182,15 +186,22 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore "org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name); } - public static boolean checkRegistration(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { - JsonArray mbeans = client.getJsonArray("/column_family/"); - Set all = new HashSet(); - for (int i = 0; i < mbeans.size(); i++) { - JsonObject mbean = mbeans.getJsonObject(i); - all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf"))); - } - return checkRegistration(server, all, n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n)); - } + public static RegistrationChecker createRegistrationChecker() { + return new RegistrationChecker() { + @Override + protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException { + JsonArray mbeans = client.getJsonArray("/column_family/"); + Set all = new HashSet(); + for (int i = 0; i < mbeans.size(); i++) { + JsonObject mbean = mbeans.getJsonObject(i); + all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf"))); + } + checkRegistration(server, all, mode, + n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n)); + } + }; + } /** * @return the name of the column family diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java index fc4fd17..967bdfe 100644 --- a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java +++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -24,20 +24,23 @@ package org.apache.cassandra.metrics; import static java.util.Arrays.asList; -import static java.util.Collections.emptySet; import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import javax.json.JsonArray; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.OperationsException; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.APIMBean; +import com.scylladb.jmx.metrics.RegistrationChecker; +import com.scylladb.jmx.metrics.RegistrationMode; import com.sun.jmx.mbeanserver.JmxMBeanServer; /** @@ -64,46 +67,45 @@ public class StreamingMetrics { private static boolean isStreamingName(ObjectName n) { return TYPE_NAME.equals(n.getKeyProperty("type")); } + + public static RegistrationChecker createRegistrationChecker() { + return new RegistrationChecker() { + @Override + protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) throws OperationsException, UnknownHostException { + Set all = new HashSet(globalNames); + JsonArray streams = client.getJsonArray("/stream_manager/"); + for (int i = 0; i < streams.size(); i++) { + JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); + for (int j = 0; j < sessions.size(); j++) { + String peer = sessions.getJsonObject(j).getString("peer"); + String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", "."); + all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope)); + all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope)); + } + } - public static void unregister(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { - APIMBean.checkRegistration(server, emptySet(), StreamingMetrics::isStreamingName, (n) -> null); - } + MetricsRegistry registry = new MetricsRegistry(client, server); + APIMBean.checkRegistration(server, all, mode, StreamingMetrics::isStreamingName, n -> { + String scope = n.getKeyProperty("scope"); + String name = n.getKeyProperty("name"); - public static boolean checkRegistration(APIClient client, JmxMBeanServer server) - throws MalformedObjectNameException, UnknownHostException { - - Set all = new HashSet(globalNames); - JsonArray streams = client.getJsonArray("/stream_manager/"); - for (int i = 0; i < streams.size(); i++) { - JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); - for (int j = 0; j < sessions.size(); j++) { - String peer = sessions.getJsonObject(j).getString("peer"); - String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", "."); - all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope)); - all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope)); - } - } - - MetricsRegistry registry = new MetricsRegistry(client, server); - return APIMBean.checkRegistration(server, all, StreamingMetrics::isStreamingName, n -> { - String scope = n.getKeyProperty("scope"); - String name = n.getKeyProperty("name"); - - String url = null; - if ("ActiveOutboundStreams".equals(name)) { - url = "/stream_manager/metrics/outbound"; - } else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) { - url = "/stream_manager/metrics/incoming"; - } else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) { - url = "/stream_manager/metrics/outgoing"; - } - if (url == null) { - throw new IllegalArgumentException(); - } - if (scope != null) { - url = url + "/" + scope; - } - return registry.counter(url); - }); - } + String url = null; + if ("ActiveOutboundStreams".equals(name)) { + url = "/stream_manager/metrics/outbound"; + } else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) { + url = "/stream_manager/metrics/incoming"; + } else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) { + url = "/stream_manager/metrics/outgoing"; + } + if (url == null) { + throw new IllegalArgumentException(); + } + if (scope != null) { + url = url + "/" + scope; + } + return registry.counter(url); + }); + } + }; + } }