From 5c33a8afa7289e768fcd2f3150f3d094688edb69 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 12:54:20 +0300 Subject: [PATCH 1/8] ColumnFamilyStore: Preparation for removing the pull mode This expose the ColumnFamilyStore registration via static method. It would allow an external object (ie. MBeanServer) to update the registration on demand. Signed-off-by: Amnon Heiman --- .../cassandra/db/ColumnFamilyStore.java | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java index 44d4d1a..ad62dad 100644 --- a/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/main/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -50,6 +50,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private String keyspace; private String name; private String mbeanName; + private static APIClient s_c = new APIClient(); static final int INTERVAL = 1000; // update every 1second public final ColumnFamilyMetrics metric; @@ -102,8 +103,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { + ",columnfamily=" + name; } + public static boolean checkRegistration() { + try { + JsonArray mbeans = s_c.getJsonArray("/column_family/"); + Set all_cf = new HashSet(); + for (int i = 0; i < mbeans.size(); i++) { + JsonObject mbean = mbeans.getJsonObject(i); + String name = getName(mbean.getString("type"), + mbean.getString("ks"), mbean.getString("cf")); + if (!cf.containsKey(name)) { + ColumnFamilyStore cfs = new ColumnFamilyStore( + mbean.getString("type"), mbean.getString("ks"), + mbean.getString("cf")); + cf.put(name, cfs); + } + all_cf.add(name); + } + // removing deleted column family + for (String n : cf.keySet()) { + if (!all_cf.contains(n)) { + cf.remove(n); + } + } + } catch (IllegalStateException e) { + return false; + } + return true; + } + private static final class CheckRegistration extends TimerTask { - private APIClient c = new APIClient(); private int missed_response = 0; // After MAX_RETRY retry we assume the API is not available // and the jmx will shutdown @@ -111,31 +139,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { @Override public void run() { try { - JsonArray mbeans = c.getJsonArray("/column_family/"); - Set all_cf = new HashSet(); - for (int i = 0; i < mbeans.size(); i++) { - JsonObject mbean = mbeans.getJsonObject(i); - String name = getName(mbean.getString("type"), - mbean.getString("ks"), mbean.getString("cf")); - if (!cf.containsKey(name)) { - ColumnFamilyStore cfs = new ColumnFamilyStore( - mbean.getString("type"), mbean.getString("ks"), - mbean.getString("cf")); - cf.put(name, cfs); + if (checkRegistration()) { + missed_response = 0; + } else { + if (missed_response++ > MAX_RETRY) { + System.err.println("API is not available, JMX is shuting down"); + System.exit(-1); } - all_cf.add(name); - } - // removing deleted column family - for (String n : cf.keySet()) { - if (!all_cf.contains(n)) { - cf.remove(n); - } - } - missed_response = 0; - } catch (IllegalStateException e) { - if (missed_response++ > MAX_RETRY) { - System.err.println("API is not available, JMX is shuting down"); - System.exit(-1); } } catch (Exception e) { // ignoring exceptions, will retry on the next interval From 0bfaba0a82da82a4864def6cebd594e153096171 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 14:46:43 +0300 Subject: [PATCH 2/8] StreamingMetrics: Preparation for removing pull mode This patch expose the check stream registration as an external static method. Signed-off-by: Amnon Heiman --- .../cassandra/metrics/StreamingMetrics.java | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java index dcd561a..ce389ce 100644 --- a/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java +++ b/src/main/java/org/apache/cassandra/metrics/StreamingMetrics.java @@ -55,6 +55,7 @@ public class StreamingMetrics public static final Counter totalOutgoingBytes = APIMetrics.newCounter("/stream_manager/metrics/outgoing", DefaultNameFactory.createMetricName(TYPE_NAME, "TotalOutgoingBytes", null)); public final Counter incomingBytes; public final Counter outgoingBytes; + private static APIClient s_c = new APIClient(); public static void register_mbeans() { TimerTask taskToExecute = new CheckRegistration(); @@ -68,34 +69,38 @@ public class StreamingMetrics outgoingBytes= APIMetrics.newCounter("/stream_manager/metrics/outgoing/" + peer, factory.createMetricName("OutgoingBytes")); } - private static final class CheckRegistration extends TimerTask { - private APIClient c = new APIClient(); + public static boolean checkRegistration() { + try { + JsonArray streams = s_c.getJsonArray("/stream_manager/"); + Set all = new HashSet(); + for (int i = 0; i < streams.size(); i ++) { + JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); + for (int j = 0; j < sessions.size(); j++) { + String name = sessions.getJsonObject(j).getString("peer"); + if (!instances.containsKey(name)) { + StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name)); + instances.put(name, metrics); + } + all.add(name); + } + } + //removing deleted stream + for (String n : instances.keySet()) { + if (! all.contains(n)) { + instances.remove(n); + } + } + } catch (Exception e) { + // ignoring exceptions, will retry on the next interval + return false; + } + return true; + } + private static final class CheckRegistration extends TimerTask { @Override public void run() { - try { - JsonArray streams = c.getJsonArray("/stream_manager/"); - Set all = new HashSet(); - for (int i = 0; i < streams.size(); i ++) { - JsonArray sessions = streams.getJsonObject(i).getJsonArray("sessions"); - for (int j = 0; j < sessions.size(); j++) { - String name = sessions.getJsonObject(j).getString("peer"); - if (!instances.containsKey(name)) { - StreamingMetrics metrics = new StreamingMetrics(InetAddress.getByName(name)); - instances.put(name, metrics); - } - all.add(name); - } - } - //removing deleted stream - for (String n : instances.keySet()) { - if (! all.contains(n)) { - instances.remove(n); - } - } - } catch (Exception e) { - // ignoring exceptions, will retry on the next interval - } + checkRegistration(); } } } From 5f17e6e0dbf1ad793151e84d3d7e4181c7df9b9a Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:34:13 +0300 Subject: [PATCH 3/8] pom: Add mx4j dependency The mx4j is used to implement the API MBean Server. Signed-off-by: Amnon Heiman --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index e58aecb..4ecfc53 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,11 @@ google-collections 1.0 + + mx4j + mx4j + 3.0.2 + From 645d04083c654c39c3373f8aff8bde080d57f420 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:06:37 +0300 Subject: [PATCH 4/8] APIMBeanIntrospector: Creating an introspector for the MBeanserver The MX4J introspector does not support *MXBean interfaces name, This causes a problem with the garbage collector and java related MBeans. To bypass that limitation the APIMBeanIntrospector inherit from MBeanIntrospector and override the relevant functionality so MXBean will be treated like MBean. Signed-off-by: Amnon Heiman --- .../jmx/utils/APIMBeanIntrospector.java | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java new file mode 100644 index 0000000..738b2c8 --- /dev/null +++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanIntrospector.java @@ -0,0 +1,103 @@ +package com.scylladb.jmx.utils; +/** + * Copyright (C) The MX4J Contributors. + * All rights reserved. + * + * This software is distributed under the terms of the MX4J License version 1.0. + * See the terms of the MX4J License in the documentation provided with this software. + */ + +/** + * Modified by ScyllaDB + * Copyright 2016 ScyllaDB + */ +/* +* This file is part of Scylla. +* +* Scylla is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Scylla is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with Scylla. If not, see . +*/ + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import javax.management.MBeanInfo; + +import mx4j.server.MBeanIntrospector; +import mx4j.server.MBeanMetaData; + +public class APIMBeanIntrospector extends MBeanIntrospector { + private static final java.util.logging.Logger logger = java.util.logging.Logger + .getLogger(APIMBeanIntrospector.class.getName()); + + public boolean isMBeanCompliant(MBeanMetaData metadata) { + Class info = metadata.getMBeanInterface(); + if (info != null) { + String cn = info.getName(); + if (cn != null) { + if (cn.endsWith("MXBean")) { + return true; + } + } + } + return super.isMBeanCompliant(metadata); + } + + public void apiIntrospectStandardMBean(MBeanMetaData metadata) { + try { + Class[] cArg = new Class[1]; + cArg[0] = MBeanMetaData.class; + Method met = MBeanIntrospector.class + .getDeclaredMethod("introspectStandardMBean", cArg); + met.setAccessible(true); + met.invoke((MBeanIntrospector) this, metadata); + } catch (NoSuchMethodException | SecurityException + | IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + logger.warning("Failed setting mbean info " + e.getMessage()); + } + } + + public void apiIntrospect(MBeanMetaData metadata) { + apiIntrospectStandardMBean(metadata); + Class[] cArg = new Class[1]; + cArg[0] = MBeanMetaData.class; + try { + Method met = MBeanIntrospector.class + .getDeclaredMethod("createStandardMBeanInfo", cArg); + met.setAccessible(true); + Object info = met.invoke((MBeanIntrospector) this, metadata); + metadata.setMBeanInfo((MBeanInfo) info); + } catch (IllegalAccessException | NoSuchMethodException + | SecurityException | IllegalArgumentException + | InvocationTargetException e) { + logger.warning("Failed setting mbean info" + e.getMessage()); + } + } + + public void introspect(MBeanMetaData metadata) { + Class mx_mbean = null; + for (Class it : metadata.getMBean().getClass().getInterfaces()) { + if (it.getName().endsWith("MXBean")) { + mx_mbean = it; + break; + } + } + if (mx_mbean != null) { + metadata.setMBeanInterface(mx_mbean); + apiIntrospect(metadata); + return; + } + super.introspect(metadata); + } +} From 4e02c52aeeb5e2b8e29d67ddcd37838a11ba224f Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:14:54 +0300 Subject: [PATCH 5/8] Adding the APIMBeanServer The APIMBeanServer is serve as a proxy for the MBeanServer. It intercept calls to the MBeanServer and check for the column family and stream registeration before they are perform. Current implementation override queryNames as it's the one that is being used by nodetool. Additional methods can be override in the future if needed. Signed-off-by: Amnon Heiman --- .../scylladb/jmx/utils/APIMBeanServer.java | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java diff --git a/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java new file mode 100644 index 0000000..920b863 --- /dev/null +++ b/src/main/java/com/scylladb/jmx/utils/APIMBeanServer.java @@ -0,0 +1,66 @@ +package com.scylladb.jmx.utils; + +/** + * Copyright 2016 ScyllaDB + */ +/* +* This file is part of Scylla. +* +* Scylla is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Scylla is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with Scylla. If not, see . +*/ +import java.lang.reflect.Field; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.QueryExp; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.metrics.StreamingMetrics; + +import mx4j.server.ChainedMBeanServer; + +public class APIMBeanServer extends ChainedMBeanServer { + private static final java.util.logging.Logger logger = java.util.logging.Logger + .getLogger(APIMBeanServer.class.getName()); + + public static void log(String str) { + logger.finest(str); + } + + public void setMBeanServer(MBeanServer server) { + if (server != null) { + try { + Field f = server.getClass().getDeclaredField("introspector"); + f.setAccessible(true); + f.set(server, new APIMBeanIntrospector()); + } catch (Exception e) { + logger.warning( + "Failed setting new interceptor" + e.getMessage()); + } + } + super.setMBeanServer(server); + } + + @Override + public Set queryNames(ObjectName name, QueryExp query) { + if (name.getCanonicalKeyPropertyListString() + .contains("ColumnFamilies")) { + ColumnFamilyStore.checkRegistration(); + } else if (name.getCanonicalKeyPropertyListString() + .contains("Stream")) { + StreamingMetrics.checkRegistration(); + } + return super.queryNames(name, query); + } +} \ No newline at end of file From 1daa5eb0307ce57d11c941336a2093b993c1495d Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:21:55 +0300 Subject: [PATCH 6/8] Adding the APIBuilder The APIBuilder is an implementation for the MBeanServerBuilder that is used to instantiate the APIMBeanServer as the platform MBeanServer. Signed-off-by: Amnon Heiman --- .../com/scylladb/jmx/utils/APIBuilder.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 src/main/java/com/scylladb/jmx/utils/APIBuilder.java diff --git a/src/main/java/com/scylladb/jmx/utils/APIBuilder.java b/src/main/java/com/scylladb/jmx/utils/APIBuilder.java new file mode 100644 index 0000000..941a2bc --- /dev/null +++ b/src/main/java/com/scylladb/jmx/utils/APIBuilder.java @@ -0,0 +1,41 @@ +package com.scylladb.jmx.utils; +/** + * Copyright 2016 ScyllaDB + */ + +/* +* This file is part of Scylla. +* +* Scylla is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* Scylla is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with Scylla. If not, see . +*/ + +import javax.management.MBeanServer; +import javax.management.MBeanServerDelegate; + +import mx4j.server.ChainedMBeanServerBuilder; + +public class APIBuilder extends ChainedMBeanServerBuilder { + public APIBuilder() { + super(new mx4j.server.MX4JMBeanServerBuilder()); + } + + public MBeanServer newMBeanServer(String defaultDomain, MBeanServer outer, + MBeanServerDelegate delegate) { + APIMBeanServer extern = new APIMBeanServer(); + MBeanServer nested = getMBeanServerBuilder().newMBeanServer( + defaultDomain, outer == null ? extern : outer, delegate); + extern.setMBeanServer(nested); + return extern; + } +} \ No newline at end of file From 3e95c89310735ffdb8427bbd40a4e2fb772c36b1 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:28:40 +0300 Subject: [PATCH 7/8] RMIServerSocketFactoryImpl: regsiter the APIBuilder This register the APIBuilder as the MBeanServerBuilder which will cause the APIMBeanServer to be used as the MBeanServer. Signed-off-by: Amnon Heiman --- .../com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java b/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java index 3135d34..a234e56 100644 --- a/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java +++ b/src/main/java/com/scylladb/jmx/utils/RMIServerSocketFactoryImpl.java @@ -41,6 +41,9 @@ public class RMIServerSocketFactoryImpl implements RMIServerSocketFactory { public static JMXConnectorServer jmxServer = null; public static void maybeInitJmx() { + System.setProperty("javax.management.builder.initial", "com.scylladb.jmx.utils.APIBuilder"); + System.setProperty("mx4j.strict.mbean.interface", "no"); + String jmxPort = System .getProperty("com.sun.management.jmxremote.port"); From 50c8ff548fc95db9f1cd9dc3499fb550ad2ee4a4 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 May 2016 13:31:07 +0300 Subject: [PATCH 8/8] Main: remove the pulling registration With the addition of the APIMBeanServer there is no longer a need for the pulling functionality to be perform for MBean registration. Signed-off-by: Amnon Heiman --- src/main/java/com/scylladb/jmx/main/Main.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/com/scylladb/jmx/main/Main.java b/src/main/java/com/scylladb/jmx/main/Main.java index 91e31f1..414aeac 100644 --- a/src/main/java/com/scylladb/jmx/main/Main.java +++ b/src/main/java/com/scylladb/jmx/main/Main.java @@ -6,13 +6,11 @@ package com.scylladb.jmx.main; import com.scylladb.jmx.api.APIConfig; import com.scylladb.jmx.utils.RMIServerSocketFactoryImpl; -import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.EndpointSnitchInfo; -import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.GCInspector; @@ -33,11 +31,9 @@ public class Main { Gossiper.getInstance(); EndpointSnitchInfo.getInstance(); FailureDetector.getInstance(); - ColumnFamilyStore.register_mbeans(); CacheService.getInstance(); CompactionManager.getInstance(); GCInspector.register(); - StreamingMetrics.register_mbeans(); Thread.sleep(Long.MAX_VALUE); }