From 6fc2c5c72056d21252be5cd762e8a0e9af1ac08c Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 27 Aug 2015 20:44:24 +0300 Subject: [PATCH 1/3] APIClient add postInt Sometimes a post command need to return, this adds a method to perform a post command that returns an int value. The general postGetVal, can be used for other types if needed. Signed-off-by: Amnon Heiman --- .../java/com/cloudius/urchin/api/APIClient.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/java/com/cloudius/urchin/api/APIClient.java b/src/main/java/com/cloudius/urchin/api/APIClient.java index cddeebd..a9168fb 100644 --- a/src/main/java/com/cloudius/urchin/api/APIClient.java +++ b/src/main/java/com/cloudius/urchin/api/APIClient.java @@ -75,6 +75,21 @@ public class APIClient { post(path, null); } + public String postGetVal(String path, MultivaluedMap queryParams) { + if (queryParams != null) { + return get(path, queryParams).post(String.class); + } + return get(path).post(String.class); + } + + public int postInt(String path, MultivaluedMap queryParams) { + return Integer.parseInt(postGetVal(path, queryParams)); + } + + public int postInt(String path) { + return postInt(path, null); + } + public void delete(String path, MultivaluedMap queryParams) { if (queryParams != null) { get(path, queryParams).delete(); From e96e62b6bde5ad61ce5b96a1f5a8c136cfab50aa Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 13 Aug 2015 09:52:06 +0300 Subject: [PATCH 2/3] Add the RepairParallelism enum This takes the RepairParallelism enum from origin Signed-off-by: Amnon Heiman --- .../cassandra/repair/RepairParallelism.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/cassandra/repair/RepairParallelism.java b/src/main/java/org/apache/cassandra/repair/RepairParallelism.java index 4b7f6ef..c68a5bb 100644 --- a/src/main/java/org/apache/cassandra/repair/RepairParallelism.java +++ b/src/main/java/org/apache/cassandra/repair/RepairParallelism.java @@ -22,6 +22,19 @@ */ package org.apache.cassandra.repair; -public class RepairParallelism { +public enum RepairParallelism { + /** + * One node at a time + */ + SEQUENTIAL, + /** + * All nodes at the same time + */ + PARALLEL, + + /** + * One node per data center at a time + */ + DATACENTER_AWARE } From 67dca4da9d28401fa5ddb9db31071215db6369c3 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 13 Aug 2015 09:53:47 +0300 Subject: [PATCH 3/3] StorageService: Add notification suport for the repair command This patch adds a notification support for the repair in StorageService. When a repair command starts a timer is set to check the status of the repair, when the repair complets it sends notification for the successful or fail of the repair. Because jconsole doesn't run method with variable number of parameters an additional MBean method was added for async repair with only a keyspace as its parameter. All the forceAsycRepair methods are mapped to the asyncRepair metod that replaces them in 2.2, when options will be supported in the repair, it will be added to the jmx. After this patch it is possible to register in the jconsole for notification, perform a forceAsyncRepair and get the notification that the repair complets --- .../cassandra/service/StorageService.java | 106 +++++++++++++++--- .../service/StorageServiceMBean.java | 1 + 2 files changed, 91 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index b40e49b..2f1f79a 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -29,6 +29,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import javax.management.*; import javax.management.openmbean.TabularData; @@ -36,7 +37,6 @@ import javax.ws.rs.core.MultivaluedMap; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.repair.RepairParallelism; - import com.cloudius.urchin.api.APIClient; import com.sun.jersey.core.util.MultivaluedMapImpl; @@ -51,7 +51,7 @@ public class StorageService extends NotificationBroadcasterSupport .getLogger(StorageService.class.getName()); private APIClient c = new APIClient(); - + private static Timer timer = new Timer("Storage Service Repair"); private StorageMetrics metrics = new StorageMetrics(); public static final StorageService instance = new StorageService(); @@ -60,6 +60,14 @@ public class StorageService extends NotificationBroadcasterSupport return instance; } + public static enum RepairStatus + { + STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED + } + + /* JMX notification serial number counter */ + private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final ObjectName jmxObjectName; public StorageService() { @@ -513,6 +521,72 @@ public class StorageService extends NotificationBroadcasterSupport c.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams); } + class CheckRepair extends TimerTask { + private APIClient c = new APIClient(); + int id; + String keyspace; + String message; + MultivaluedMap queryParams = new MultivaluedMapImpl(); + int cmd; + public CheckRepair(int id, String keyspace) { + this.id = id; + this.keyspace = keyspace; + APIClient.set_query_param(queryParams, "id", Integer.toString(id)); + message = String.format("Repair session %d ", id); + // The returned id is the command number + this.cmd = id; + } + @Override + public void run() { + String status = c.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); + if (!status.equals("RUNNING")) { + cancel(); + if (!status.equals("SUCCESSFUL")) { + sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.SESSION_SUCCESS.ordinal()}); + } else { + sendNotification("repair", message + "failed", new int[]{cmd, RepairStatus.SESSION_FAILED.ordinal()}); + } + sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.FINISHED.ordinal()}); + } + } + + } + + /** + * Sends JMX notification to subscribers. + * + * @param type Message type + * @param message Message itself + * @param userObject Arbitrary object to attach to notification + */ + public void sendNotification(String type, String message, Object userObject) + { + Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message); + jmxNotification.setUserData(userObject); + sendNotification(jmxNotification); + } + + public String getRepairMessage(final int cmd, + final String keyspace, + final int ranges_size, + final RepairParallelism parallelismDegree, + final boolean fullRepair) { + return String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", + cmd, ranges_size, keyspace, parallelismDegree, fullRepair); + } + + /** + * + * @param repair + */ + public int waitAndNotifyRepair(int cmd, String keyspace, String message) { + logger.info(message); + sendNotification("repair", message, new int[]{cmd, RepairStatus.STARTED.ordinal()}); + TimerTask taskToExecute = new CheckRepair(cmd, keyspace); + timer.schedule(taskToExecute, 100, 1000); + return cmd; + } + /** * Invoke repair asynchronously. You can track repair progress by * subscribing JMX notification sent from this StorageServiceMBean. @@ -536,28 +610,26 @@ public class StorageService extends NotificationBroadcasterSupport opts = opts + op + "=" + options.get(op); } APIClient.set_query_param(queryParams, "options", opts); - return c.getIntValue("/storage_service/repair_async/" + keyspace); + int cmd = c.postInt("/storage_service/repair_async/" + keyspace); + waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true)); + return cmd; } - @Deprecated + @Override public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException { log(" forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } - @Deprecated - public int forceRepairAsync(String keyspace, - RepairParallelism parallelismDegree, Collection dataCenters, - Collection hosts, boolean primaryRange, boolean fullRepair, - String... columnFamilies) { - log(" forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); - return c.getIntValue(""); + public int forceRepairAsync(String keyspace) { + Map options = new HashMap(); + return repairAsync(keyspace, options); } - @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, @@ -575,12 +647,13 @@ public class StorageService extends NotificationBroadcasterSupport return c.getIntValue(""); } - @Deprecated + @Override public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } @Deprecated @@ -1144,7 +1217,8 @@ public class StorageService extends NotificationBroadcasterSupport String... columnFamilies) { // TODO Auto-generated method stub log(" forceRepairAsync()"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } @Override diff --git a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java index 77709ec..9acbbc8 100644 --- a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -351,6 +351,7 @@ public interface StorageServiceMBean extends NotificationEmitter { Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies); + public int forceRepairAsync(String keyspace); /** * Same as forceRepairAsync, but handles a specified range */