diff --git a/src/main/java/org/apache/cassandra/net/MessagingService.java b/src/main/java/org/apache/cassandra/net/MessagingService.java index 01bf6a3..c9dd910 100644 --- a/src/main/java/org/apache/cassandra/net/MessagingService.java +++ b/src/main/java/org/apache/cassandra/net/MessagingService.java @@ -27,25 +27,78 @@ import java.net.*; import java.util.*; import java.util.Map.Entry; +import javax.json.JsonArray; +import javax.json.JsonObject; import javax.management.MBeanServer; import javax.management.ObjectName; - +import org.apache.cassandra.metrics.DroppedMessageMetrics; import com.cloudius.urchin.api.APIClient; +import com.yammer.metrics.core.APISettableMeter; public final class MessagingService implements MessagingServiceMBean { + static final int INTERVAL = 1000; // update every 1second public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; private static final java.util.logging.Logger logger = java.util.logging.Logger .getLogger(MessagingService.class.getName()); - + Map dropped; private APIClient c = new APIClient(); private final ObjectName jmxObjectName; + /* All verb handler identifiers */ + public enum Verb + { + MUTATION, + @Deprecated BINARY, + READ_REPAIR, + READ, + REQUEST_RESPONSE, // client-initiated reads and writes + @Deprecated STREAM_INITIATE, + @Deprecated STREAM_INITIATE_DONE, + @Deprecated STREAM_REPLY, + @Deprecated STREAM_REQUEST, + RANGE_SLICE, + @Deprecated BOOTSTRAP_TOKEN, + @Deprecated TREE_REQUEST, + @Deprecated TREE_RESPONSE, + @Deprecated JOIN, + GOSSIP_DIGEST_SYN, + GOSSIP_DIGEST_ACK, + GOSSIP_DIGEST_ACK2, + @Deprecated DEFINITIONS_ANNOUNCE, + DEFINITIONS_UPDATE, + TRUNCATE, + SCHEMA_CHECK, + @Deprecated INDEX_SCAN, + REPLICATION_FINISHED, + INTERNAL_RESPONSE, // responses to internal calls + COUNTER_MUTATION, + @Deprecated STREAMING_REPAIR_REQUEST, + @Deprecated STREAMING_REPAIR_RESPONSE, + SNAPSHOT, // Similar to nt snapshot + MIGRATION_REQUEST, + GOSSIP_SHUTDOWN, + _TRACE, // dummy verb so we can use MS.droppedMessages + ECHO, + REPAIR_MESSAGE, + // use as padding for backwards compatability where a previous version needs to validate a verb from the future. + PAXOS_PREPARE, + PAXOS_PROPOSE, + PAXOS_COMMIT, + PAGED_RANGE, + // remember to add new verbs at the end, since we serialize by ordinal + UNUSED_1, + UNUSED_2, + UNUSED_3, + ; + } public void log(String str) { System.out.println(str); logger.info(str); } + private static Timer timer = new Timer("Dropped messages"); + public MessagingService() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { @@ -56,10 +109,30 @@ public final class MessagingService implements MessagingServiceMBean { } catch (Exception e) { throw new RuntimeException(e); } + timer.schedule(new CheckDroppedMessages(), INTERVAL, INTERVAL); } static MessagingService instance = new MessagingService(); + private static final class CheckDroppedMessages extends TimerTask { + @Override + public void run() { + if (instance.dropped == null) { + instance.dropped = new HashMap(); + for (Verb v : Verb.values()) { + instance.dropped.put(v.name(), new DroppedMessageMetrics(v)); + } + } + Map val = instance.getDroppedMessages(); + for (String k : val.keySet()) { + APISettableMeter meter = instance.dropped.get(k).getMeter(); + meter.set(val.get(k)); + System.out.println("tick " + k + " " + meter.count()); + meter.tick(); + } + } + } + public static MessagingService getInstance() { return instance; } @@ -111,7 +184,13 @@ public final class MessagingService implements MessagingServiceMBean { */ public Map getDroppedMessages() { log(" getDroppedMessages()"); - return c.getMapStringIntegerValue("/messaging_service/messages/dropped"); + Map res = new HashMap(); + JsonArray arr = c.getJsonArray("/messaging_service/messages/dropped_by_ver"); + for (int i = 0; i < arr.size(); i++) { + JsonObject obj = arr.getJsonObject(i); + res.put(obj.getString("verb"), obj.getInt("count")); + } + return res; } /** @@ -119,7 +198,10 @@ public final class MessagingService implements MessagingServiceMBean { */ public Map getRecentlyDroppedMessages() { log(" getRecentlyDroppedMessages()"); - return c.getMapStringIntegerValue(""); + Map map = new HashMap(); + for (Map.Entry entry : dropped.entrySet()) + map.put(entry.getKey(), entry.getValue().getRecentlyDropped()); + return map; } /**