From 4ed049739a2778b76b420d240da890b56f5e1e34 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 17 Oct 2016 11:34:21 +0000 Subject: [PATCH] Storage service: Fix 3.x style notifications (repair) --- .../cassandra/service/StorageService.java | 143 +++++++++++++++--- 1 file changed, 119 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index 7b7aa7e..4ea72ed 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -22,6 +22,8 @@ */ package org.apache.cassandra.service; +import static java.util.Arrays.asList; + import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -672,10 +674,12 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, private String message; private MultivaluedMap queryParams = new MultivaluedHashMap(); private int cmd; + private final boolean legacy; - public CheckRepair(int id, String keyspace) { + public CheckRepair(int id, String keyspace, boolean legacy) { this.id = id; this.keyspace = keyspace; + this.legacy = legacy; APIClient.set_query_param(queryParams, "id", Integer.toString(id)); message = String.format("Repair session %d ", id); // The returned id is the command number @@ -687,11 +691,12 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, String status = client.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); if (!status.equals("RUNNING")) { cancel(); - if (!status.equals("SUCCESSFUL")) { - sendNotification("repair", message + "failed", - new int[] { cmd, RepairStatus.SESSION_FAILED.ordinal() }); + if (status.equals("SUCCESSFUL")) { + sendMessage(cmd, RepairStatus.SESSION_SUCCESS, message, legacy); + } else { + sendMessage(cmd, RepairStatus.SESSION_FAILED, message + "failed", legacy); } - sendNotification("repair", message + "finished", new int[] { cmd, RepairStatus.FINISHED.ordinal() }); + sendMessage(cmd, RepairStatus.FINISHED, message + "finished", legacy); } } @@ -725,14 +730,57 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, * * @param repair */ - public int waitAndNotifyRepair(int cmd, String keyspace, String message) { + private int waitAndNotifyRepair(int cmd, String keyspace, String message, boolean legacy) { logger.finest(message); - sendNotification("repair", message, new int[] { cmd, RepairStatus.STARTED.ordinal() }); - TimerTask taskToExecute = new CheckRepair(cmd, keyspace); + + sendMessage(cmd, RepairStatus.STARTED, message, legacy); + + TimerTask taskToExecute = new CheckRepair(cmd, keyspace, legacy); timer.schedule(taskToExecute, 100, 1000); return cmd; } + // See org.apache.cassandra.utils.progress.ProgressEventType + private static enum ProgressEventType { + START, PROGRESS, ERROR, ABORT, SUCCESS, COMPLETE, NOTIFICATION + } + + private void sendMessage(int cmd, RepairStatus status, String message, boolean legacy) { + String tag = "repair:" + cmd; + + ProgressEventType type = ProgressEventType.ERROR; + int total = 100; + int count = 0; + switch (status) { + case STARTED: + type = ProgressEventType.START; + break; + case FINISHED: + type = ProgressEventType.COMPLETE; + count = 100; + break; + case SESSION_SUCCESS: + type = ProgressEventType.SUCCESS; + count = 100; + break; + default: + break; + } + + Notification jmxNotification = new Notification("progress", tag, notificationSerialNumber.incrementAndGet(), + message); + Map userData = new HashMap<>(); + userData.put("type", type.ordinal()); + userData.put("progressCount", count); + userData.put("total", total); + jmxNotification.setUserData(userData); + sendNotification(jmxNotification); + + if (legacy) { + sendNotification("repair", message, new int[] { cmd, status.ordinal() }); + } + } + /** * Invoke repair asynchronously. You can track repair progress by * subscribing JMX notification sent from this StorageServiceMBean. @@ -747,6 +795,24 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, */ @Override public int repairAsync(String keyspace, Map options) { + return repairAsync(keyspace, options, false); + } + + @SuppressWarnings("unused") + private static final String PARALLELISM_KEY = "parallelism"; + private static final String PRIMARY_RANGE_KEY = "primaryRange"; + @SuppressWarnings("unused") + private static final String INCREMENTAL_KEY = "incremental"; + @SuppressWarnings("unused") + private static final String JOB_THREADS_KEY = "jobThreads"; + private static final String RANGES_KEY = "ranges"; + private static final String COLUMNFAMILIES_KEY = "columnFamilies"; + private static final String DATACENTERS_KEY = "dataCenters"; + private static final String HOSTS_KEY = "hosts"; + @SuppressWarnings("unused") + private static final String TRACE_KEY = "trace"; + + private int repairAsync(String keyspace, Map options, boolean legacy) { log(" repairAsync(String keyspace, Map options)"); MultivaluedMap queryParams = new MultivaluedHashMap(); for (String op : options.keySet()) { @@ -754,38 +820,66 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, } int cmd = client.postInt("/storage_service/repair_async/" + keyspace, queryParams); - waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true)); + waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true), + legacy); return cmd; } + private static String commaSeparated(Collection c) { + String s = c.toString(); + return s.substring(1, s.length() - 1); + } + + private int repairRangeAsync(String beginToken, String endToken, String keyspaceName, Boolean isSequential, + Collection dataCenters, Collection hosts, Boolean primaryRange, Boolean repairedAt, + String... columnFamilies) { + log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException"); + + Map options = new HashMap(); + if (beginToken != null && endToken != null) { + options.put(RANGES_KEY, beginToken + ":" + endToken); + } + if (dataCenters != null) { + options.put(DATACENTERS_KEY, commaSeparated(dataCenters)); + } + if (hosts != null) { + options.put(HOSTS_KEY, commaSeparated(hosts)); + } + if (columnFamilies != null && columnFamilies.length != 0) { + options.put(COLUMNFAMILIES_KEY, commaSeparated(asList(columnFamilies))); + } + if (primaryRange != null) { + options.put(PRIMARY_RANGE_KEY, primaryRange.toString()); + } + + return repairAsync(keyspaceName, options, true); + } + @Override + @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) - throws IOException { + throws IOException { log(" forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException"); - Map options = new HashMap(); - return repairAsync(keyspace, options); - } - - public int forceRepairAsync(String keyspace) { - Map options = new HashMap(); - return repairAsync(keyspace, options); + return repairRangeAsync(null, null, keyspace, isSequential, dataCenters, hosts, primaryRange, repairedAt, + columnFamilies); } @Override + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, - Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) - throws IOException { + Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException"); - return client.getIntValue(""); + return repairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, hosts, null, repairedAt, + columnFamilies); } @Override - public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, + @Deprecated + public int forceRepairAsync(String keyspaceName, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); - Map options = new HashMap(); - return repairAsync(keyspace, options); + return repairRangeAsync(null, null, keyspaceName, isSequential, null, null, primaryRange, null, columnFamilies); } @Override @@ -793,7 +887,8 @@ public class StorageService extends MetricsMBean implements StorageServiceMBean, public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies) { log(" forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies)"); - return client.getIntValue(""); + return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, null, null, repairedAt, + columnFamilies); } @Override