From fec8b44942473a446aa9c29c1c6debde0e9930e4 Mon Sep 17 00:00:00 2001 From: elcallio Date: Tue, 11 Oct 2016 14:06:33 +0200 Subject: [PATCH] Rework MessagingService --- .../cassandra/net/MessagingService.java | 154 +++++++----------- 1 file changed, 58 insertions(+), 96 deletions(-) diff --git a/src/main/java/org/apache/cassandra/net/MessagingService.java b/src/main/java/org/apache/cassandra/net/MessagingService.java index a663030..4bbae71 100644 --- a/src/main/java/org/apache/cassandra/net/MessagingService.java +++ b/src/main/java/org/apache/cassandra/net/MessagingService.java @@ -24,105 +24,60 @@ package org.apache.cassandra.net; import static java.util.Collections.emptyMap; -import java.lang.management.ManagementFactory; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.json.JsonArray; import javax.json.JsonObject; -import javax.management.MBeanServer; -import javax.management.ObjectName; import org.apache.cassandra.metrics.DroppedMessageMetrics; import com.scylladb.jmx.api.APIClient; +import com.scylladb.jmx.metrics.MetricsMBean; -public final class MessagingService implements MessagingServiceMBean { - static final int INTERVAL = 1000; // update every 1second +public final class MessagingService extends MetricsMBean implements MessagingServiceMBean { public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; - private static final java.util.logging.Logger logger = java.util.logging.Logger - .getLogger(MessagingService.class.getName()); - Map dropped; - private APIClient c = new APIClient(); - Map resent_timeout = new HashMap(); - private final ObjectName jmxObjectName; + private static final Logger logger = Logger.getLogger(MessagingService.class.getName()); + + private Map resentTimeouts = new HashMap(); private long recentTimeoutCount; /* All verb handler identifiers */ - public enum Verb - { - MUTATION, - @Deprecated BINARY, - READ_REPAIR, - READ, - REQUEST_RESPONSE, // client-initiated reads and writes - @Deprecated STREAM_INITIATE, - @Deprecated STREAM_INITIATE_DONE, - @Deprecated STREAM_REPLY, - @Deprecated STREAM_REQUEST, - RANGE_SLICE, - @Deprecated BOOTSTRAP_TOKEN, - @Deprecated TREE_REQUEST, - @Deprecated TREE_RESPONSE, - @Deprecated JOIN, - GOSSIP_DIGEST_SYN, - GOSSIP_DIGEST_ACK, - GOSSIP_DIGEST_ACK2, - @Deprecated DEFINITIONS_ANNOUNCE, - DEFINITIONS_UPDATE, - TRUNCATE, - SCHEMA_CHECK, - @Deprecated INDEX_SCAN, - REPLICATION_FINISHED, - INTERNAL_RESPONSE, // responses to internal calls - COUNTER_MUTATION, - @Deprecated STREAMING_REPAIR_REQUEST, - @Deprecated STREAMING_REPAIR_RESPONSE, - SNAPSHOT, // Similar to nt snapshot - MIGRATION_REQUEST, - GOSSIP_SHUTDOWN, - _TRACE, // dummy verb so we can use MS.droppedMessages - ECHO, - REPAIR_MESSAGE, - // use as padding for backwards compatability where a previous version needs to validate a verb from the future. - PAXOS_PREPARE, - PAXOS_PROPOSE, - PAXOS_COMMIT, - PAGED_RANGE, + public enum Verb { + MUTATION, @Deprecated BINARY, READ_REPAIR, READ, REQUEST_RESPONSE, // client-initiated + // reads + // and + // writes + @Deprecated STREAM_INITIATE, @Deprecated STREAM_INITIATE_DONE, @Deprecated STREAM_REPLY, @Deprecated STREAM_REQUEST, RANGE_SLICE, @Deprecated BOOTSTRAP_TOKEN, @Deprecated TREE_REQUEST, @Deprecated TREE_RESPONSE, @Deprecated JOIN, GOSSIP_DIGEST_SYN, GOSSIP_DIGEST_ACK, GOSSIP_DIGEST_ACK2, @Deprecated DEFINITIONS_ANNOUNCE, DEFINITIONS_UPDATE, TRUNCATE, SCHEMA_CHECK, @Deprecated INDEX_SCAN, REPLICATION_FINISHED, INTERNAL_RESPONSE, // responses + // to + // internal + // calls + COUNTER_MUTATION, @Deprecated STREAMING_REPAIR_REQUEST, @Deprecated STREAMING_REPAIR_RESPONSE, SNAPSHOT, // Similar + // to + // nt + // snapshot + MIGRATION_REQUEST, GOSSIP_SHUTDOWN, _TRACE, // dummy verb so we can use + // MS.droppedMessages + ECHO, REPAIR_MESSAGE, + // use as padding for backwards compatability where a previous version + // needs to validate a verb from the future. + PAXOS_PREPARE, PAXOS_PROPOSE, PAXOS_COMMIT, PAGED_RANGE, // remember to add new verbs at the end, since we serialize by ordinal - UNUSED_1, - UNUSED_2, - UNUSED_3, - ; + UNUSED_1, UNUSED_2, UNUSED_3,; } public void log(String str) { logger.finest(str); } - public MessagingService() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try { - jmxObjectName = new ObjectName(MBEAN_NAME); - mbs.registerMBean(this, jmxObjectName); - dropped = new HashMap(); - for (Verb v : Verb.values()) { - dropped.put(v.name(), new DroppedMessageMetrics(v)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - static MessagingService instance; - - public static MessagingService getInstance() { - if (instance == null) { - instance = new MessagingService(); - } - return instance; + public MessagingService(APIClient client) { + super(MBEAN_NAME, client, + Stream.of(Verb.values()).map(v -> new DroppedMessageMetrics(v)).collect(Collectors.toList())); } /** @@ -131,7 +86,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getCommandPendingTasks() { log(" getCommandPendingTasks()"); - return c.getMapStringIntegerValue("/messaging_service/messages/pending"); + return client.getMapStringIntegerValue("/messaging_service/messages/pending"); } /** @@ -140,8 +95,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getCommandCompletedTasks() { log("getCommandCompletedTasks()"); - Map res = c - .getListMapStringLongValue("/messaging_service/messages/sent"); + Map res = client.getListMapStringLongValue("/messaging_service/messages/sent"); return res; } @@ -151,7 +105,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getCommandDroppedTasks() { log(" getCommandDroppedTasks()"); - return c.getMapStringLongValue("/messaging_service/messages/dropped"); + return client.getMapStringLongValue("/messaging_service/messages/dropped"); } /** @@ -160,7 +114,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getResponsePendingTasks() { log(" getResponsePendingTasks()"); - return c.getMapStringIntegerValue("/messaging_service/messages/respond_pending"); + return client.getMapStringIntegerValue("/messaging_service/messages/respond_pending"); } /** @@ -169,7 +123,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getResponseCompletedTasks() { log(" getResponseCompletedTasks()"); - return c.getMapStringLongValue("/messaging_service/messages/respond_completed"); + return client.getMapStringLongValue("/messaging_service/messages/respond_completed"); } /** @@ -179,7 +133,7 @@ public final class MessagingService implements MessagingServiceMBean { public Map getDroppedMessages() { log(" getDroppedMessages()"); Map res = new HashMap(); - JsonArray arr = c.getJsonArray("/messaging_service/messages/dropped_by_ver"); + JsonArray arr = client.getJsonArray("/messaging_service/messages/dropped_by_ver"); for (int i = 0; i < arr.size(); i++) { JsonObject obj = arr.getJsonObject(i); res.put(obj.getString("verb"), obj.getInt("count")); @@ -187,17 +141,26 @@ public final class MessagingService implements MessagingServiceMBean { return res; } + private Map recent; + /** * dropped message counts since last called */ - @SuppressWarnings("deprecation") @Override public Map getRecentlyDroppedMessages() { log(" getRecentlyDroppedMessages()"); - Map map = new HashMap(); - for (Map.Entry entry : dropped.entrySet()) - map.put(entry.getKey(), entry.getValue().getRecentlyDropped()); - return map; + + Map dropped = getDroppedMessages(), result = new HashMap<>(dropped), old = recent; + + recent = dropped; + + if (old != null) { + for (Map.Entry e : old.entrySet()) { + result.put(e.getKey(), result.get(e.getKey()) - e.getValue()); + } + } + + return result; } /** @@ -220,7 +183,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public Map getTimeoutsPerHost() { log(" getTimeoutsPerHost()"); - return c.getMapStringLongValue("/messaging_service/messages/timeout"); + return client.getMapStringLongValue("/messaging_service/messages/timeout"); } /** @@ -243,12 +206,11 @@ public final class MessagingService implements MessagingServiceMBean { log(" getRecentTimeoutsPerHost()"); Map timeouts = getTimeoutsPerHost(); Map result = new HashMap(); - for ( Entry e : timeouts.entrySet()) { - long res = e.getValue().longValue() - - ((resent_timeout.containsKey(e.getKey()))? (resent_timeout.get(e.getKey())).longValue() - : 0); - resent_timeout.put(e.getKey(), e.getValue()); - result.put(e.getKey(),res); + for (Entry e : timeouts.entrySet()) { + long res = e.getValue().longValue() + - ((resentTimeouts.containsKey(e.getKey())) ? (resentTimeouts.get(e.getKey())).longValue() : 0); + resentTimeouts.put(e.getKey(), e.getValue()); + result.put(e.getKey(), res); } return result; } @@ -256,7 +218,7 @@ public final class MessagingService implements MessagingServiceMBean { @Override public int getVersion(String address) throws UnknownHostException { log(" getVersion(String address) throws UnknownHostException"); - return c.getIntValue(""); + return client.getIntValue(""); } @Override