diff --git a/src/main/java/com/scylladb/jmx/metrics/APIMBean.java b/src/main/java/com/scylladb/jmx/metrics/APIMBean.java index 8eb8a01..ad95e2a 100644 --- a/src/main/java/com/scylladb/jmx/metrics/APIMBean.java +++ b/src/main/java/com/scylladb/jmx/metrics/APIMBean.java @@ -1,6 +1,7 @@ package com.scylladb.jmx.metrics; import java.lang.reflect.Field; +import java.util.EnumSet; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; @@ -55,35 +56,39 @@ public class APIMBean implements MBeanRegistration { * @param generator * {@link Function} to create a new MBean instance for a given * {@link ObjectName} - * * @return * @throws MalformedObjectNameException */ - public static boolean checkRegistration(JmxMBeanServer server, Set all, - final Predicate predicate, Function generator) - throws MalformedObjectNameException { - Set registered = queryNames(server, predicate); - for (ObjectName name : registered) { - if (!all.contains(name)) { - try { - server.getMBeanServerInterceptor().unregisterMBean(name); - } catch (MBeanRegistrationException | InstanceNotFoundException e) { - } - } - } + public static boolean checkRegistration(JmxMBeanServer server, Set all, + EnumSet mode, final Predicate predicate, + Function generator) throws MalformedObjectNameException { + Set registered = queryNames(server, predicate); + if (mode.contains(RegistrationMode.Remove)) { + for (ObjectName name : registered) { + if (!all.contains(name)) { + try { + server.getMBeanServerInterceptor().unregisterMBean(name); + } catch (MBeanRegistrationException | InstanceNotFoundException e) { + } + } + } + } - int added = 0; - for (ObjectName name : all) { - if (!registered.contains(name)) { - try { - server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name); - added++; - } catch (InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - } - } - } - return added > 0; - } + int added = 0; + if (mode.contains(RegistrationMode.Add)) { + for (ObjectName name : all) { + if (!registered.contains(name)) { + try { + server.getMBeanServerInterceptor().registerMBean(generator.apply(name), name); + added++; + } catch (InstanceAlreadyExistsException | MBeanRegistrationException + | NotCompliantMBeanException e) { + } + } + } + } + return added > 0; + } /** * Helper method to query {@link ObjectName}s from an {@link MBeanServer} diff --git a/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java b/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java new file mode 100644 index 0000000..fde260f --- /dev/null +++ b/src/main/java/com/scylladb/jmx/metrics/RegistrationChecker.java @@ -0,0 +1,69 @@ +package com.scylladb.jmx.metrics; + +import static com.scylladb.jmx.metrics.RegistrationMode.Remove; +import static com.scylladb.jmx.metrics.RegistrationMode.Wait; +import static java.util.EnumSet.allOf; +import static java.util.EnumSet.of; + +import java.net.UnknownHostException; +import java.util.EnumSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.management.OperationsException; + +import com.scylladb.jmx.api.APIClient; +import com.sun.jmx.mbeanserver.JmxMBeanServer; + +/** + * Helper type to do optional locking for registration. Allows for + * per-bind-point locks and registration, instead of per-type or per-instance + * locks which may be misguiding, since for example one instance can be bound to + * many MBeanServers etc. + * + * Also allows for polled checks, i.e. try-lock and either wait or skip. Wait, + * because we probably should not repeat things hidden by this type too often, + * and skip because for example a periodic task checking can just skip if a + * user-initiated registration check is being done. + * + * @author calle + * + */ +@SuppressWarnings("restriction") +public abstract class RegistrationChecker { + private final Lock lock = new ReentrantLock(); + + public static final EnumSet REMOVE_NO_WAIT = of(Remove); + public static final EnumSet ADD_AND_REMOVE = allOf(RegistrationMode.class); + + public final void reap(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException { + check(client, server, REMOVE_NO_WAIT); + } + + public final void check(APIClient client, JmxMBeanServer server) throws OperationsException, UnknownHostException { + check(client, server, ADD_AND_REMOVE); + } + + public final void check(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException, UnknownHostException { + if (!lock.tryLock()) { + if (mode.contains(Wait)) { + // someone is doing update. + // since this is jmx, and sloppy, we'll just + // assume that once he is done, things are + // good enough. + lock.lock(); + lock.unlock(); + } + return; + } + try { + doCheck(client, server, mode); + } finally { + lock.unlock(); + } + } + + protected abstract void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException, UnknownHostException; +} diff --git a/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java b/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java new file mode 100644 index 0000000..773cde4 --- /dev/null +++ b/src/main/java/com/scylladb/jmx/metrics/RegistrationMode.java @@ -0,0 +1,5 @@ +package com.scylladb.jmx.metrics; + +public enum RegistrationMode { + Wait, Add, Remove, +} \ No newline at end of file diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java index 4f7c8c5..cbbad3e 100644 --- a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java +++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java @@ -1,8 +1,13 @@ package com.scylladb.jmx.utils; +import static java.util.Arrays.asList; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static java.util.concurrent.TimeUnit.MINUTES; + import java.io.ObjectInputStream; import java.net.UnknownHostException; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -34,12 +39,17 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.metrics.StreamingMetrics; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.RegistrationChecker; import com.sun.jmx.mbeanserver.JmxMBeanServer; @SuppressWarnings("restriction") public class APIMBeanServer implements MBeanServer { @SuppressWarnings("unused") private static final Logger logger = Logger.getLogger(APIMBeanServer.class.getName()); + private static final ScheduledExecutorService executor = newScheduledThreadPool(1); + + private final RegistrationChecker columnFamilyStoreChecker = ColumnFamilyStore.createRegistrationChecker(); + private final RegistrationChecker streamingMetricsChecker = StreamingMetrics.createRegistrationChecker(); private final APIClient client; private final JmxMBeanServer server; @@ -47,6 +57,16 @@ public class APIMBeanServer implements MBeanServer { public APIMBeanServer(APIClient client, JmxMBeanServer server) { this.client = client; this.server = server; + + executor.scheduleWithFixedDelay(() -> { + for (RegistrationChecker c : asList(columnFamilyStoreChecker, streamingMetricsChecker)) { + try { + c.reap(client, server); + } catch (OperationsException | UnknownHostException e) { + // TODO: log? + } + } + }, 1, 5, MINUTES); } private static ObjectInstance prepareForRemote(final ObjectInstance i) { @@ -65,7 +85,7 @@ public class APIMBeanServer implements MBeanServer { throw new IllegalArgumentException(n.toString()); } } - + @Override public ObjectInstance createMBean(String className, ObjectName name) throws ReflectionException, InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, NotCompliantMBeanException { @@ -286,25 +306,22 @@ public class APIMBeanServer implements MBeanServer { } static final Pattern tables = Pattern.compile("^\\*?((Index)?ColumnFamil(ies|y)|(Index)?(Table(s)?)?)$"); - - private boolean checkRegistrations(ObjectName name) { + + private void checkRegistrations(ObjectName name) { if (name != null && server.isRegistered(name)) { - return false; + return; } - - boolean result = false; - + try { String type = name != null ? name.getKeyProperty("type") : null; if (type == null || tables.matcher(type).matches()) { - result |= ColumnFamilyStore.checkRegistration(client, server); + columnFamilyStoreChecker.check(client, server); } if (type == null || StreamingMetrics.TYPE_NAME.equals(type)) { - result |= StreamingMetrics.checkRegistration(client, server); + streamingMetricsChecker.check(client, server); } - } catch (MalformedObjectNameException | UnknownHostException e) { + } catch (OperationsException | UnknownHostException e) { // TODO: log } - return result; } } \ No newline at end of file diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java index 92b12ae..0833f02 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -30,6 +30,7 @@ import static javax.json.Json.createObjectBuilder; import java.io.StringReader; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -50,6 +51,7 @@ import javax.json.JsonReader; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.OperationsException; import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeType; @@ -65,6 +67,8 @@ import org.apache.cassandra.metrics.TableMetrics; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.MetricsMBean; +import com.scylladb.jmx.metrics.RegistrationChecker; +import com.scylladb.jmx.metrics.RegistrationMode; import com.sun.jmx.mbeanserver.JmxMBeanServer; import com.google.common.base.Throwables; @@ -182,15 +186,22 @@ public class ColumnFamilyStore extends MetricsMBean implements ColumnFamilyStore "org.apache.cassandra.db:type=" + type + ",keyspace=" + keyspace + ",columnfamily=" + name); } - public static boolean checkRegistration(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { - JsonArray mbeans = client.getJsonArray("/column_family/"); - Set all = new HashSet(); - for (int i = 0; i < mbeans.size(); i++) { - JsonObject mbean = mbeans.getJsonObject(i); - all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf"))); - } - return checkRegistration(server, all, n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n)); - } + public static RegistrationChecker createRegistrationChecker() { + return new RegistrationChecker() { + @Override + protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) + throws OperationsException { + JsonArray mbeans = client.getJsonArray("/column_family/"); + Set all = new HashSet(); + for (int i = 0; i < mbeans.size(); i++) { + JsonObject mbean = mbeans.getJsonObject(i); + all.add(getName(mbean.getString("type"), mbean.getString("ks"), mbean.getString("cf"))); + } + checkRegistration(server, all, mode, + n -> TYPE_NAMES.contains(n.getKeyProperty("type")), n -> new ColumnFamilyStore(client, n)); + } + }; + } /** * @return the name of the column family diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java index fc4fd17..967bdfe 100644 --- a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java +++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -24,20 +24,23 @@ package org.apache.cassandra.metrics; import static java.util.Arrays.asList; -import static java.util.Collections.emptySet; import static org.apache.cassandra.metrics.DefaultNameFactory.createMetricName; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import javax.json.JsonArray; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.OperationsException; import com.scylladb.jmx.api.APIClient; import com.scylladb.jmx.metrics.APIMBean; +import com.scylladb.jmx.metrics.RegistrationChecker; +import com.scylladb.jmx.metrics.RegistrationMode; import com.sun.jmx.mbeanserver.JmxMBeanServer; /** @@ -64,46 +67,45 @@ public class StreamingMetrics { private static boolean isStreamingName(ObjectName n) { return TYPE_NAME.equals(n.getKeyProperty("type")); } + + public static RegistrationChecker createRegistrationChecker() { + return new RegistrationChecker() { + @Override + protected void doCheck(APIClient client, JmxMBeanServer server, EnumSet mode) throws OperationsException, UnknownHostException { + Set all = new HashSet(globalNames); + JsonArray streams = client.getJsonArray("/stream_manager/"); + for (int i = 0; i < streams.size(); i++) { + JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); + for (int j = 0; j < sessions.size(); j++) { + String peer = sessions.getJsonObject(j).getString("peer"); + String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", "."); + all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope)); + all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope)); + } + } - public static void unregister(APIClient client, JmxMBeanServer server) throws MalformedObjectNameException { - APIMBean.checkRegistration(server, emptySet(), StreamingMetrics::isStreamingName, (n) -> null); - } + MetricsRegistry registry = new MetricsRegistry(client, server); + APIMBean.checkRegistration(server, all, mode, StreamingMetrics::isStreamingName, n -> { + String scope = n.getKeyProperty("scope"); + String name = n.getKeyProperty("name"); - public static boolean checkRegistration(APIClient client, JmxMBeanServer server) - throws MalformedObjectNameException, UnknownHostException { - - Set all = new HashSet(globalNames); - JsonArray streams = client.getJsonArray("/stream_manager/"); - for (int i = 0; i < streams.size(); i++) { - JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); - for (int j = 0; j < sessions.size(); j++) { - String peer = sessions.getJsonObject(j).getString("peer"); - String scope = InetAddress.getByName(peer).getHostAddress().replaceAll(":", "."); - all.add(createMetricName(TYPE_NAME, "IncomingBytes", scope)); - all.add(createMetricName(TYPE_NAME, "OutgoingBytes", scope)); - } - } - - MetricsRegistry registry = new MetricsRegistry(client, server); - return APIMBean.checkRegistration(server, all, StreamingMetrics::isStreamingName, n -> { - String scope = n.getKeyProperty("scope"); - String name = n.getKeyProperty("name"); - - String url = null; - if ("ActiveOutboundStreams".equals(name)) { - url = "/stream_manager/metrics/outbound"; - } else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) { - url = "/stream_manager/metrics/incoming"; - } else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) { - url = "/stream_manager/metrics/outgoing"; - } - if (url == null) { - throw new IllegalArgumentException(); - } - if (scope != null) { - url = url + "/" + scope; - } - return registry.counter(url); - }); - } + String url = null; + if ("ActiveOutboundStreams".equals(name)) { + url = "/stream_manager/metrics/outbound"; + } else if ("IncomingBytes".equals(name) || "TotalIncomingBytes".equals(name)) { + url = "/stream_manager/metrics/incoming"; + } else if ("OutgoingBytes".equals(name) || "TotalOutgoingBytes".equals(name)) { + url = "/stream_manager/metrics/outgoing"; + } + if (url == null) { + throw new IllegalArgumentException(); + } + if (scope != null) { + url = url + "/" + scope; + } + return registry.counter(url); + }); + } + }; + } } diff --git a/src/main/java/org/apache/cassandra/metrics/TableMetrics.java b/src/main/java/org/apache/cassandra/metrics/TableMetrics.java index 00af96c..9b83d2b 100644 --- a/src/main/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/main/java/org/apache/cassandra/metrics/TableMetrics.java @@ -19,6 +19,8 @@ package org.apache.cassandra.metrics; import static com.scylladb.jmx.api.APIClient.getReader; +import java.io.InvalidObjectException; +import java.io.ObjectStreamException; import java.util.Hashtable; import java.util.function.BiFunction; import java.util.function.Function; @@ -295,7 +297,6 @@ public class TableMetrics implements Metrics { registry.createDummyTableGauge(Double.class, "PercentRepaired"); } - @SuppressWarnings("serial") static class TableMetricObjectName extends javax.management.ObjectName { private final TableMetricStringNameFactory factory; private final String metricName; @@ -400,6 +401,18 @@ public class TableMetrics implements Metrics { public boolean isPropertyValuePattern() { return false; } + + /** + * This type is not really serializable. + * Replace it with vanilla objectname. + */ + private Object writeReplace() throws ObjectStreamException { + try { + return new ObjectName(getDomain(), getKeyPropertyList()); + } catch (MalformedObjectNameException e) { + throw new InvalidObjectException(toString()); + } + } } static interface TableMetricStringNameFactory {