MessagingService add dropped and recently dropped messages impl

This patch adds the implementation of the dropped messages and the
recent dropped messages.

The MessagingService holds a timer that periodically load the dropped
messages from the API and distribute the results between the
DroppedMessagesMetrics instances.

This mimic the timer behaviour in origin, only it does one API call for
all Verb.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
This commit is contained in:
Amnon Heiman 2015-11-16 11:56:40 +02:00
parent 896fd64de9
commit db7aad26f5
1 changed files with 86 additions and 4 deletions

View File

@ -27,25 +27,78 @@ import java.net.*;
import java.util.*;
import java.util.Map.Entry;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
import com.cloudius.urchin.api.APIClient;
import com.yammer.metrics.core.APISettableMeter;
public final class MessagingService implements MessagingServiceMBean {
static final int INTERVAL = 1000; // update every 1second
public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
private static final java.util.logging.Logger logger = java.util.logging.Logger
.getLogger(MessagingService.class.getName());
Map<String, DroppedMessageMetrics> dropped;
private APIClient c = new APIClient();
private final ObjectName jmxObjectName;
/* All verb handler identifiers */
public enum Verb
{
MUTATION,
@Deprecated BINARY,
READ_REPAIR,
READ,
REQUEST_RESPONSE, // client-initiated reads and writes
@Deprecated STREAM_INITIATE,
@Deprecated STREAM_INITIATE_DONE,
@Deprecated STREAM_REPLY,
@Deprecated STREAM_REQUEST,
RANGE_SLICE,
@Deprecated BOOTSTRAP_TOKEN,
@Deprecated TREE_REQUEST,
@Deprecated TREE_RESPONSE,
@Deprecated JOIN,
GOSSIP_DIGEST_SYN,
GOSSIP_DIGEST_ACK,
GOSSIP_DIGEST_ACK2,
@Deprecated DEFINITIONS_ANNOUNCE,
DEFINITIONS_UPDATE,
TRUNCATE,
SCHEMA_CHECK,
@Deprecated INDEX_SCAN,
REPLICATION_FINISHED,
INTERNAL_RESPONSE, // responses to internal calls
COUNTER_MUTATION,
@Deprecated STREAMING_REPAIR_REQUEST,
@Deprecated STREAMING_REPAIR_RESPONSE,
SNAPSHOT, // Similar to nt snapshot
MIGRATION_REQUEST,
GOSSIP_SHUTDOWN,
_TRACE, // dummy verb so we can use MS.droppedMessages
ECHO,
REPAIR_MESSAGE,
// use as padding for backwards compatability where a previous version needs to validate a verb from the future.
PAXOS_PREPARE,
PAXOS_PROPOSE,
PAXOS_COMMIT,
PAGED_RANGE,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
UNUSED_3,
;
}
public void log(String str) {
System.out.println(str);
logger.info(str);
}
private static Timer timer = new Timer("Dropped messages");
public MessagingService() {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
@ -56,10 +109,30 @@ public final class MessagingService implements MessagingServiceMBean {
} catch (Exception e) {
throw new RuntimeException(e);
}
timer.schedule(new CheckDroppedMessages(), INTERVAL, INTERVAL);
}
static MessagingService instance = new MessagingService();
private static final class CheckDroppedMessages extends TimerTask {
@Override
public void run() {
if (instance.dropped == null) {
instance.dropped = new HashMap<String, DroppedMessageMetrics>();
for (Verb v : Verb.values()) {
instance.dropped.put(v.name(), new DroppedMessageMetrics(v));
}
}
Map<String, Integer> val = instance.getDroppedMessages();
for (String k : val.keySet()) {
APISettableMeter meter = instance.dropped.get(k).getMeter();
meter.set(val.get(k));
System.out.println("tick " + k + " " + meter.count());
meter.tick();
}
}
}
public static MessagingService getInstance() {
return instance;
}
@ -111,7 +184,13 @@ public final class MessagingService implements MessagingServiceMBean {
*/
public Map<String, Integer> getDroppedMessages() {
log(" getDroppedMessages()");
return c.getMapStringIntegerValue("/messaging_service/messages/dropped");
Map<String, Integer> res = new HashMap<String, Integer>();
JsonArray arr = c.getJsonArray("/messaging_service/messages/dropped_by_ver");
for (int i = 0; i < arr.size(); i++) {
JsonObject obj = arr.getJsonObject(i);
res.put(obj.getString("verb"), obj.getInt("count"));
}
return res;
}
/**
@ -119,7 +198,10 @@ public final class MessagingService implements MessagingServiceMBean {
*/
public Map<String, Integer> getRecentlyDroppedMessages() {
log(" getRecentlyDroppedMessages()");
return c.getMapStringIntegerValue("");
Map<String, Integer> map = new HashMap<String, Integer>();
for (Map.Entry<String, DroppedMessageMetrics> entry : dropped.entrySet())
map.put(entry.getKey(), entry.getValue().getRecentlyDropped());
return map;
}
/**