Merge "Remove the pulling mode for MBean registration" from Amnon

"This series replaces that mechanism with an implementation of the MBeanServer
that intercept the relevant MBean call and call the relevant registration
function.

The pulling mechanism was removed from Main."
This commit is contained in:
Avi Kivity 2016-05-02 15:40:37 +03:00
commit f8112b5d57
8 changed files with 283 additions and 54 deletions

View File

@ -81,6 +81,11 @@
<artifactId>google-collections</artifactId> <artifactId>google-collections</artifactId>
<version>1.0</version> <version>1.0</version>
</dependency> </dependency>
<dependency>
<groupId>mx4j</groupId>
<artifactId>mx4j</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>

View File

@ -6,13 +6,11 @@ package com.scylladb.jmx.main;
import com.scylladb.jmx.api.APIConfig; import com.scylladb.jmx.api.APIConfig;
import com.scylladb.jmx.utils.RMIServerSocketFactoryImpl; import com.scylladb.jmx.utils.RMIServerSocketFactoryImpl;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.EndpointSnitchInfo; import org.apache.cassandra.locator.EndpointSnitchInfo;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.GCInspector; import org.apache.cassandra.service.GCInspector;
@ -33,11 +31,9 @@ public class Main {
Gossiper.getInstance(); Gossiper.getInstance();
EndpointSnitchInfo.getInstance(); EndpointSnitchInfo.getInstance();
FailureDetector.getInstance(); FailureDetector.getInstance();
ColumnFamilyStore.register_mbeans();
CacheService.getInstance(); CacheService.getInstance();
CompactionManager.getInstance(); CompactionManager.getInstance();
GCInspector.register(); GCInspector.register();
StreamingMetrics.register_mbeans();
Thread.sleep(Long.MAX_VALUE); Thread.sleep(Long.MAX_VALUE);
} }

View File

@ -0,0 +1,41 @@
package com.scylladb.jmx.utils;
/**
* Copyright 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
import javax.management.MBeanServer;
import javax.management.MBeanServerDelegate;
import mx4j.server.ChainedMBeanServerBuilder;
public class APIBuilder extends ChainedMBeanServerBuilder {
public APIBuilder() {
super(new mx4j.server.MX4JMBeanServerBuilder());
}
public MBeanServer newMBeanServer(String defaultDomain, MBeanServer outer,
MBeanServerDelegate delegate) {
APIMBeanServer extern = new APIMBeanServer();
MBeanServer nested = getMBeanServerBuilder().newMBeanServer(
defaultDomain, outer == null ? extern : outer, delegate);
extern.setMBeanServer(nested);
return extern;
}
}

View File

@ -0,0 +1,103 @@
package com.scylladb.jmx.utils;
/**
* Copyright (C) The MX4J Contributors.
* All rights reserved.
*
* This software is distributed under the terms of the MX4J License version 1.0.
* See the terms of the MX4J License in the documentation provided with this software.
*/
/**
* Modified by ScyllaDB
* Copyright 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import javax.management.MBeanInfo;
import mx4j.server.MBeanIntrospector;
import mx4j.server.MBeanMetaData;
public class APIMBeanIntrospector extends MBeanIntrospector {
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(APIMBeanIntrospector.class.getName());
public boolean isMBeanCompliant(MBeanMetaData metadata) {
Class info = metadata.getMBeanInterface();
if (info != null) {
String cn = info.getName();
if (cn != null) {
if (cn.endsWith("MXBean")) {
return true;
}
}
}
return super.isMBeanCompliant(metadata);
}
public void apiIntrospectStandardMBean(MBeanMetaData metadata) {
try {
Class[] cArg = new Class[1];
cArg[0] = MBeanMetaData.class;
Method met = MBeanIntrospector.class
.getDeclaredMethod("introspectStandardMBean", cArg);
met.setAccessible(true);
met.invoke((MBeanIntrospector) this, metadata);
} catch (NoSuchMethodException | SecurityException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warning("Failed setting mbean info " + e.getMessage());
}
}
public void apiIntrospect(MBeanMetaData metadata) {
apiIntrospectStandardMBean(metadata);
Class[] cArg = new Class[1];
cArg[0] = MBeanMetaData.class;
try {
Method met = MBeanIntrospector.class
.getDeclaredMethod("createStandardMBeanInfo", cArg);
met.setAccessible(true);
Object info = met.invoke((MBeanIntrospector) this, metadata);
metadata.setMBeanInfo((MBeanInfo) info);
} catch (IllegalAccessException | NoSuchMethodException
| SecurityException | IllegalArgumentException
| InvocationTargetException e) {
logger.warning("Failed setting mbean info" + e.getMessage());
}
}
public void introspect(MBeanMetaData metadata) {
Class<?> mx_mbean = null;
for (Class<?> it : metadata.getMBean().getClass().getInterfaces()) {
if (it.getName().endsWith("MXBean")) {
mx_mbean = it;
break;
}
}
if (mx_mbean != null) {
metadata.setMBeanInterface(mx_mbean);
apiIntrospect(metadata);
return;
}
super.introspect(metadata);
}
}

View File

@ -0,0 +1,66 @@
package com.scylladb.jmx.utils;
/**
* Copyright 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
import java.lang.reflect.Field;
import java.util.Set;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.metrics.StreamingMetrics;
import mx4j.server.ChainedMBeanServer;
public class APIMBeanServer extends ChainedMBeanServer {
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(APIMBeanServer.class.getName());
public static void log(String str) {
logger.finest(str);
}
public void setMBeanServer(MBeanServer server) {
if (server != null) {
try {
Field f = server.getClass().getDeclaredField("introspector");
f.setAccessible(true);
f.set(server, new APIMBeanIntrospector());
} catch (Exception e) {
logger.warning(
"Failed setting new interceptor" + e.getMessage());
}
}
super.setMBeanServer(server);
}
@Override
public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {
if (name.getCanonicalKeyPropertyListString()
.contains("ColumnFamilies")) {
ColumnFamilyStore.checkRegistration();
} else if (name.getCanonicalKeyPropertyListString()
.contains("Stream")) {
StreamingMetrics.checkRegistration();
}
return super.queryNames(name, query);
}
}

View File

@ -41,6 +41,9 @@ public class RMIServerSocketFactoryImpl implements RMIServerSocketFactory {
public static JMXConnectorServer jmxServer = null; public static JMXConnectorServer jmxServer = null;
public static void maybeInitJmx() { public static void maybeInitJmx() {
System.setProperty("javax.management.builder.initial", "com.scylladb.jmx.utils.APIBuilder");
System.setProperty("mx4j.strict.mbean.interface", "no");
String jmxPort = System String jmxPort = System
.getProperty("com.sun.management.jmxremote.port"); .getProperty("com.sun.management.jmxremote.port");

View File

@ -50,6 +50,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
private String keyspace; private String keyspace;
private String name; private String name;
private String mbeanName; private String mbeanName;
private static APIClient s_c = new APIClient();
static final int INTERVAL = 1000; // update every 1second static final int INTERVAL = 1000; // update every 1second
public final ColumnFamilyMetrics metric; public final ColumnFamilyMetrics metric;
@ -102,8 +103,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
+ ",columnfamily=" + name; + ",columnfamily=" + name;
} }
public static boolean checkRegistration() {
try {
JsonArray mbeans = s_c.getJsonArray("/column_family/");
Set<String> all_cf = new HashSet<String>();
for (int i = 0; i < mbeans.size(); i++) {
JsonObject mbean = mbeans.getJsonObject(i);
String name = getName(mbean.getString("type"),
mbean.getString("ks"), mbean.getString("cf"));
if (!cf.containsKey(name)) {
ColumnFamilyStore cfs = new ColumnFamilyStore(
mbean.getString("type"), mbean.getString("ks"),
mbean.getString("cf"));
cf.put(name, cfs);
}
all_cf.add(name);
}
// removing deleted column family
for (String n : cf.keySet()) {
if (!all_cf.contains(n)) {
cf.remove(n);
}
}
} catch (IllegalStateException e) {
return false;
}
return true;
}
private static final class CheckRegistration extends TimerTask { private static final class CheckRegistration extends TimerTask {
private APIClient c = new APIClient();
private int missed_response = 0; private int missed_response = 0;
// After MAX_RETRY retry we assume the API is not available // After MAX_RETRY retry we assume the API is not available
// and the jmx will shutdown // and the jmx will shutdown
@ -111,31 +139,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean {
@Override @Override
public void run() { public void run() {
try { try {
JsonArray mbeans = c.getJsonArray("/column_family/"); if (checkRegistration()) {
Set<String> all_cf = new HashSet<String>(); missed_response = 0;
for (int i = 0; i < mbeans.size(); i++) { } else {
JsonObject mbean = mbeans.getJsonObject(i); if (missed_response++ > MAX_RETRY) {
String name = getName(mbean.getString("type"), System.err.println("API is not available, JMX is shuting down");
mbean.getString("ks"), mbean.getString("cf")); System.exit(-1);
if (!cf.containsKey(name)) {
ColumnFamilyStore cfs = new ColumnFamilyStore(
mbean.getString("type"), mbean.getString("ks"),
mbean.getString("cf"));
cf.put(name, cfs);
} }
all_cf.add(name);
}
// removing deleted column family
for (String n : cf.keySet()) {
if (!all_cf.contains(n)) {
cf.remove(n);
}
}
missed_response = 0;
} catch (IllegalStateException e) {
if (missed_response++ > MAX_RETRY) {
System.err.println("API is not available, JMX is shuting down");
System.exit(-1);
} }
} catch (Exception e) { } catch (Exception e) {
// ignoring exceptions, will retry on the next interval // ignoring exceptions, will retry on the next interval

View File

@ -55,6 +55,7 @@ public class StreamingMetrics
public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null)); public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null));
public final Counter incomingBytes; public final Counter incomingBytes;
public final Counter outgoingBytes; public final Counter outgoingBytes;
private static APIClient s_c = new APIClient();
public static void register_mbeans() { public static void register_mbeans() {
TimerTask taskToExecute = new CheckRegistration(); TimerTask taskToExecute = new CheckRegistration();
@ -68,34 +69,38 @@ public class StreamingMetrics
outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes")); outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes"));
} }
private static final class CheckRegistration extends TimerTask { public static boolean checkRegistration() {
private APIClient c = new APIClient(); try {
JsonArray streams = s_c.getJsonArray("/stream_manager/");
Set<String> all = new HashSet<String>();
for (int i = 0; i < streams.size(); i ++) {
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
for (int j = 0; j < sessions.size(); j++) {
String name = sessions.getJsonObject(j).getString("peer");
if (!instances.containsKey(name)) {
StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
instances.put(name, metrics);
}
all.add(name);
}
}
//removing deleted stream
for (String n : instances.keySet()) {
if (! all.contains(n)) {
instances.remove(n);
}
}
} catch (Exception e) {
// ignoring exceptions, will retry on the next interval
return false;
}
return true;
}
private static final class CheckRegistration extends TimerTask {
@Override @Override
public void run() { public void run() {
try { checkRegistration();
JsonArray streams = c.getJsonArray("/stream_manager/");
Set<String> all = new HashSet<String>();
for (int i = 0; i < streams.size(); i ++) {
JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions");
for (int j = 0; j < sessions.size(); j++) {
String name = sessions.getJsonObject(j).getString("peer");
if (!instances.containsKey(name)) {
StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name));
instances.put(name, metrics);
}
all.add(name);
}
}
//removing deleted stream
for (String n : instances.keySet()) {
if (! all.contains(n)) {
instances.remove(n);
}
}
} catch (Exception e) {
// ignoring exceptions, will retry on the next interval
}
} }
} }
} }