diff --git a/pom.xml b/pom.xml
index e58aecb..4ecfc53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,11 @@
google-collections
1.0
+
+ mx4j
+ mx4j
+ 3.0.2
+
diff --git a/src/main/java/com/scylladb/jmx/main/Main.java b/src/main/java/com/scylladb/jmx/main/Main.java
index 91e31f1..414aeac 100644
--- a/src/main/java/com/scylladb/jmx/main/Main.java
+++ b/src/main/java/com/scylladb/jmx/main/Main.java
@@ -6,13 +6,11 @@ package com.scylladb.jmx.main;
import com.scylladb.jmx.api.APIConfig;
import com.scylladb.jmx.utils.RMIServerSocketFactoryImpl;
-import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.EndpointSnitchInfo;
-import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.GCInspector;
@@ -33,11 +31,9 @@ public class Main {
Gossiper.getInstance();
EndpointSnitchInfo.getInstance();
FailureDetector.getInstance();
- ColumnFamilyStore.register_mbeans();
CacheService.getInstance();
CompactionManager.getInstance();
GCInspector.register();
- StreamingMetrics.register_mbeans();
Thread.sleep(Long.MAX_VALUE);
}
diff --git a/src/main/java/com/scylladb/jmx/utils/APIBuilder.java b/src/main/java/com/scylladb/jmx/utils/APIBuilder.java
new file mode 100644
index 0000000..941a2bc
--- /dev/null
+++ b/src/main/java/com/scylladb/jmx/utils/APIBuilder.java
@@ -0,0 +1,41 @@
+package com.scylladb.jmx.utils;
+/**
+ * Copyright 2016 ScyllaDB
+ */
+
+/*
+* This file is part of Scylla.
+*
+* Scylla is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Scylla is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with Scylla. If not, see .
+*/
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerDelegate;
+
+import mx4j.server.ChainedMBeanServerBuilder;
+
+public class APIBuilder extends ChainedMBeanServerBuilder {
+ public APIBuilder() {
+ super(new mx4j.server.MX4JMBeanServerBuilder());
+ }
+
+ public MBeanServer newMBeanServer(String defaultDomain, MBeanServer outer,
+ MBeanServerDelegate delegate) {
+ APIMBeanServer extern = new APIMBeanServer();
+ MBeanServer nested = getMBeanServerBuilder().newMBeanServer(
+ defaultDomain, outer == null ? extern : outer, delegate);
+ extern.setMBeanServer(nested);
+ return extern;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java
new file mode 100644
index 0000000..738b2c8
--- /dev/null
+++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java
@@ -0,0 +1,103 @@
+package com.scylladb.jmx.utils;
+/**
+ * Copyright (C) The MX4J Contributors.
+ * All rights reserved.
+ *
+ * This software is distributed under the terms of the MX4J License version 1.0.
+ * See the terms of the MX4J License in the documentation provided with this software.
+ */
+
+/**
+ * Modified by ScyllaDB
+ * Copyright 2016 ScyllaDB
+ */
+/*
+* This file is part of Scylla.
+*
+* Scylla is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Scylla is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with Scylla. If not, see .
+*/
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import javax.management.MBeanInfo;
+
+import mx4j.server.MBeanIntrospector;
+import mx4j.server.MBeanMetaData;
+
+public class APIMBeanIntrospector extends MBeanIntrospector {
+ private static final java.util.logging.Logger logger = java.util.logging.Logger
+ .getLogger(APIMBeanIntrospector.class.getName());
+
+ public boolean isMBeanCompliant(MBeanMetaData metadata) {
+ Class info = metadata.getMBeanInterface();
+ if (info != null) {
+ String cn = info.getName();
+ if (cn != null) {
+ if (cn.endsWith("MXBean")) {
+ return true;
+ }
+ }
+ }
+ return super.isMBeanCompliant(metadata);
+ }
+
+ public void apiIntrospectStandardMBean(MBeanMetaData metadata) {
+ try {
+ Class[] cArg = new Class[1];
+ cArg[0] = MBeanMetaData.class;
+ Method met = MBeanIntrospector.class
+ .getDeclaredMethod("introspectStandardMBean", cArg);
+ met.setAccessible(true);
+ met.invoke((MBeanIntrospector) this, metadata);
+ } catch (NoSuchMethodException | SecurityException
+ | IllegalAccessException | IllegalArgumentException
+ | InvocationTargetException e) {
+ logger.warning("Failed setting mbean info " + e.getMessage());
+ }
+ }
+
+ public void apiIntrospect(MBeanMetaData metadata) {
+ apiIntrospectStandardMBean(metadata);
+ Class[] cArg = new Class[1];
+ cArg[0] = MBeanMetaData.class;
+ try {
+ Method met = MBeanIntrospector.class
+ .getDeclaredMethod("createStandardMBeanInfo", cArg);
+ met.setAccessible(true);
+ Object info = met.invoke((MBeanIntrospector) this, metadata);
+ metadata.setMBeanInfo((MBeanInfo) info);
+ } catch (IllegalAccessException | NoSuchMethodException
+ | SecurityException | IllegalArgumentException
+ | InvocationTargetException e) {
+ logger.warning("Failed setting mbean info" + e.getMessage());
+ }
+ }
+
+ public void introspect(MBeanMetaData metadata) {
+ Class> mx_mbean = null;
+ for (Class> it : metadata.getMBean().getClass().getInterfaces()) {
+ if (it.getName().endsWith("MXBean")) {
+ mx_mbean = it;
+ break;
+ }
+ }
+ if (mx_mbean != null) {
+ metadata.setMBeanInterface(mx_mbean);
+ apiIntrospect(metadata);
+ return;
+ }
+ super.introspect(metadata);
+ }
+}
diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java
new file mode 100644
index 0000000..920b863
--- /dev/null
+++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java
@@ -0,0 +1,66 @@
+package com.scylladb.jmx.utils;
+
+/**
+ * Copyright 2016 ScyllaDB
+ */
+/*
+* This file is part of Scylla.
+*
+* Scylla is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* Scylla is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with Scylla. If not, see .
+*/
+import java.lang.reflect.Field;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.QueryExp;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.metrics.StreamingMetrics;
+
+import mx4j.server.ChainedMBeanServer;
+
+public class APIMBeanServer extends ChainedMBeanServer {
+ private static final java.util.logging.Logger logger = java.util.logging.Logger
+ .getLogger(APIMBeanServer.class.getName());
+
+ public static void log(String str) {
+ logger.finest(str);
+ }
+
+ public void setMBeanServer(MBeanServer server) {
+ if (server != null) {
+ try {
+ Field f = server.getClass().getDeclaredField("introspector");
+ f.setAccessible(true);
+ f.set(server, new APIMBeanIntrospector());
+ } catch (Exception e) {
+ logger.warning(
+ "Failed setting new interceptor" + e.getMessage());
+ }
+ }
+ super.setMBeanServer(server);
+ }
+
+ @Override
+ public Set queryNames(ObjectName name, QueryExp query) {
+ if (name.getCanonicalKeyPropertyListString()
+ .contains("ColumnFamilies")) {
+ ColumnFamilyStore.checkRegistration();
+ } else if (name.getCanonicalKeyPropertyListString()
+ .contains("Stream")) {
+ StreamingMetrics.checkRegistration();
+ }
+ return super.queryNames(name, query);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java b/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java
index 3135d34..a234e56 100644
--- a/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java
+++ b/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java
@@ -41,6 +41,9 @@ public class RMIServerSocketFactoryImpl implements RMIServerSocketFactory {
public static JMXConnectorServer jmxServer = null;
public static void maybeInitJmx() {
+ System.setProperty("javax.management.builder.initial", "com.scylladb.jmx.utils.APIBuilder");
+ System.setProperty("mx4j.strict.mbean.interface", "no");
+
String jmxPort = System
.getProperty("com.sun.management.jmxremote.port");
diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 44d4d1a..ad62dad 100644
--- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -50,6 +50,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
private String keyspace;
private String name;
private String mbeanName;
+ private static APIClient s_c = new APIClient();
static final int INTERVAL = 1000; // update every 1second
public final ColumnFamilyMetrics metric;
@@ -102,8 +103,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
+ ",columnfamily=" + name;
}
+ public static boolean checkRegistration() {
+ try {
+ JsonArray mbeans = s_c.getJsonArray("/column_family/");
+ Set all_cf = new HashSet();
+ for (int i = 0; i < mbeans.size(); i++) {
+ JsonObject mbean = mbeans.getJsonObject(i);
+ String name = getName(mbean.getString("type"),
+ mbean.getString("ks"), mbean.getString("cf"));
+ if (!cf.containsKey(name)) {
+ ColumnFamilyStore cfs = new ColumnFamilyStore(
+ mbean.getString("type"), mbean.getString("ks"),
+ mbean.getString("cf"));
+ cf.put(name, cfs);
+ }
+ all_cf.add(name);
+ }
+ // removing deleted column family
+ for (String n : cf.keySet()) {
+ if (!all_cf.contains(n)) {
+ cf.remove(n);
+ }
+ }
+ } catch (IllegalStateException e) {
+ return false;
+ }
+ return true;
+ }
+
private static final class CheckRegistration extends TimerTask {
- private APIClient c = new APIClient();
private int missed_response = 0;
// After MAX_RETRY retry we assume the API is not available
// and the jmx will shutdown
@@ -111,31 +139,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
@Override
public void run() {
try {
- JsonArray mbeans = c.getJsonArray("/column_family/");
- Set all_cf = new HashSet();
- for (int i = 0; i < mbeans.size(); i++) {
- JsonObject mbean = mbeans.getJsonObject(i);
- String name = getName(mbean.getString("type"),
- mbean.getString("ks"), mbean.getString("cf"));
- if (!cf.containsKey(name)) {
- ColumnFamilyStore cfs = new ColumnFamilyStore(
- mbean.getString("type"), mbean.getString("ks"),
- mbean.getString("cf"));
- cf.put(name, cfs);
+ if (checkRegistration()) {
+ missed_response = 0;
+ } else {
+ if (missed_response++ > MAX_RETRY) {
+ System.err.println("API is not available, JMX is shuting down");
+ System.exit(-1);
}
- all_cf.add(name);
- }
- // removing deleted column family
- for (String n : cf.keySet()) {
- if (!all_cf.contains(n)) {
- cf.remove(n);
- }
- }
- missed_response = 0;
- } catch (IllegalStateException e) {
- if (missed_response++ > MAX_RETRY) {
- System.err.println("API is not available, JMX is shuting down");
- System.exit(-1);
}
} catch (Exception e) {
// ignoring exceptions, will retry on the next interval
diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
index dcd561a..ce389ce 100644
--- a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
+++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -55,6 +55,7 @@ public class StreamingMetrics
public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null));
public final Counter incomingBytes;
public final Counter outgoingBytes;
+ private static APIClient s_c = new APIClient();
public static void register_mbeans() {
TimerTask taskToExecute = new CheckRegistration();
@@ -68,34 +69,38 @@ public class StreamingMetrics
outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes"));
}
- private static final class CheckRegistration extends TimerTask {
- private APIClient c = new APIClient();
+ public static boolean checkRegistration() {
+ try {
+ JsonArray streams = s_c.getJsonArray("/stream_manager/");
+ Set all = new HashSet();
+ for (int i = 0; i < streams.size(); i ++) {
+ JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
+ for (int j = 0; j < sessions.size(); j++) {
+ String name = sessions.getJsonObject(j).getString("peer");
+ if (!instances.containsKey(name)) {
+ StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
+ instances.put(name, metrics);
+ }
+ all.add(name);
+ }
+ }
+ //removing deleted stream
+ for (String n : instances.keySet()) {
+ if (! all.contains(n)) {
+ instances.remove(n);
+ }
+ }
+ } catch (Exception e) {
+ // ignoring exceptions, will retry on the next interval
+ return false;
+ }
+ return true;
+ }
+ private static final class CheckRegistration extends TimerTask {
@Override
public void run() {
- try {
- JsonArray streams = c.getJsonArray("/stream_manager/");
- Set all = new HashSet();
- for (int i = 0; i < streams.size(); i ++) {
- JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
- for (int j = 0; j < sessions.size(); j++) {
- String name = sessions.getJsonObject(j).getString("peer");
- if (!instances.containsKey(name)) {
- StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
- instances.put(name, metrics);
- }
- all.add(name);
- }
- }
- //removing deleted stream
- for (String n : instances.keySet()) {
- if (! all.contains(n)) {
- instances.remove(n);
- }
- }
- } catch (Exception e) {
- // ignoring exceptions, will retry on the next interval
- }
+ checkRegistration();
}
}
}