From db7aad26f502b18b819870d0f1235b16e4f14041 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 16 Nov 2015 11:56:40 +0200 Subject: [PATCH] MessagingService add dropped and recently dropped messages impl This patch adds the implementation of the dropped messages and the recent dropped messages. The MessagingService holds a timer that periodically load the dropped messages from the API and distribute the results between the DroppedMessagesMetrics instances. This mimic the timer behaviour in origin, only it does one API call for all Verb. Signed-off-by: Amnon Heiman --- .../cassandra/net/MessagingService.java | 90 ++++++++++++++++++- 1 file changed, 86 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/cassandra/net/MessagingService.java b/src/main/java/org/apache/cassandra/net/MessagingService.java index 01bf6a3..c9dd910 100644 --- a/src/main/java/org/apache/cassandra/net/MessagingService.java +++ b/src/main/java/org/apache/cassandra/net/MessagingService.java @@ -27,25 +27,78 @@ import java.net.*; import java.util.*; import java.util.Map.Entry; +import javax.json.JsonArray; +import javax.json.JsonObject; import javax.management.MBeanServer; import javax.management.ObjectName; - +import org.apache.cassandra.metrics.DroppedMessageMetrics; import com.cloudius.urchin.api.APIClient; +import com.yammer.metrics.core.APISettableMeter; public final class MessagingService implements MessagingServiceMBean { + static final int INTERVAL = 1000; // update every 1second public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; private static final java.util.logging.Logger logger = java.util.logging.Logger .getLogger(MessagingService.class.getName()); - + Map dropped; private APIClient c = new APIClient(); private final ObjectName jmxObjectName; + /* All verb handler identifiers */ + public enum Verb + { + MUTATION, + @Deprecated BINARY, + READ_REPAIR, + READ, + REQUEST_RESPONSE, // client-initiated reads and writes + @Deprecated STREAM_INITIATE, + @Deprecated STREAM_INITIATE_DONE, + @Deprecated STREAM_REPLY, + @Deprecated STREAM_REQUEST, + RANGE_SLICE, + @Deprecated BOOTSTRAP_TOKEN, + @Deprecated TREE_REQUEST, + @Deprecated TREE_RESPONSE, + @Deprecated JOIN, + GOSSIP_DIGEST_SYN, + GOSSIP_DIGEST_ACK, + GOSSIP_DIGEST_ACK2, + @Deprecated DEFINITIONS_ANNOUNCE, + DEFINITIONS_UPDATE, + TRUNCATE, + SCHEMA_CHECK, + @Deprecated INDEX_SCAN, + REPLICATION_FINISHED, + INTERNAL_RESPONSE, // responses to internal calls + COUNTER_MUTATION, + @Deprecated STREAMING_REPAIR_REQUEST, + @Deprecated STREAMING_REPAIR_RESPONSE, + SNAPSHOT, // Similar to nt snapshot + MIGRATION_REQUEST, + GOSSIP_SHUTDOWN, + _TRACE, // dummy verb so we can use MS.droppedMessages + ECHO, + REPAIR_MESSAGE, + // use as padding for backwards compatability where a previous version needs to validate a verb from the future. + PAXOS_PREPARE, + PAXOS_PROPOSE, + PAXOS_COMMIT, + PAGED_RANGE, + // remember to add new verbs at the end, since we serialize by ordinal + UNUSED_1, + UNUSED_2, + UNUSED_3, + ; + } public void log(String str) { System.out.println(str); logger.info(str); } + private static Timer timer = new Timer("Dropped messages"); + public MessagingService() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { @@ -56,10 +109,30 @@ public final class MessagingService implements MessagingServiceMBean { } catch (Exception e) { throw new RuntimeException(e); } + timer.schedule(new CheckDroppedMessages(), INTERVAL, INTERVAL); } static MessagingService instance = new MessagingService(); + private static final class CheckDroppedMessages extends TimerTask { + @Override + public void run() { + if (instance.dropped == null) { + instance.dropped = new HashMap(); + for (Verb v : Verb.values()) { + instance.dropped.put(v.name(), new DroppedMessageMetrics(v)); + } + } + Map val = instance.getDroppedMessages(); + for (String k : val.keySet()) { + APISettableMeter meter = instance.dropped.get(k).getMeter(); + meter.set(val.get(k)); + System.out.println("tick " + k + " " + meter.count()); + meter.tick(); + } + } + } + public static MessagingService getInstance() { return instance; } @@ -111,7 +184,13 @@ public final class MessagingService implements MessagingServiceMBean { */ public Map getDroppedMessages() { log(" getDroppedMessages()"); - return c.getMapStringIntegerValue("/messaging_service/messages/dropped"); + Map res = new HashMap(); + JsonArray arr = c.getJsonArray("/messaging_service/messages/dropped_by_ver"); + for (int i = 0; i < arr.size(); i++) { + JsonObject obj = arr.getJsonObject(i); + res.put(obj.getString("verb"), obj.getInt("count")); + } + return res; } /** @@ -119,7 +198,10 @@ public final class MessagingService implements MessagingServiceMBean { */ public Map getRecentlyDroppedMessages() { log(" getRecentlyDroppedMessages()"); - return c.getMapStringIntegerValue(""); + Map map = new HashMap(); + for (Map.Entry entry : dropped.entrySet()) + map.put(entry.getKey(), entry.getValue().getRecentlyDropped()); + return map; } /**