diff --git a/src/main/java/com/cloudius/urchin/api/APIClient.java b/src/main/java/com/cloudius/urchin/api/APIClient.java index bbb9171..ccd397e 100644 --- a/src/main/java/com/cloudius/urchin/api/APIClient.java +++ b/src/main/java/com/cloudius/urchin/api/APIClient.java @@ -77,6 +77,21 @@ public class APIClient { post(path, null); } + public String postGetVal(String path, MultivaluedMap queryParams) { + if (queryParams != null) { + return get(path, queryParams).post(String.class); + } + return get(path).post(String.class); + } + + public int postInt(String path, MultivaluedMap queryParams) { + return Integer.parseInt(postGetVal(path, queryParams)); + } + + public int postInt(String path) { + return postInt(path, null); + } + public void delete(String path, MultivaluedMap queryParams) { if (queryParams != null) { get(path, queryParams).delete(); diff --git a/src/main/java/org/apache/cassandra/repair/RepairParallelism.java b/src/main/java/org/apache/cassandra/repair/RepairParallelism.java index 4b7f6ef..c68a5bb 100644 --- a/src/main/java/org/apache/cassandra/repair/RepairParallelism.java +++ b/src/main/java/org/apache/cassandra/repair/RepairParallelism.java @@ -22,6 +22,19 @@ */ package org.apache.cassandra.repair; -public class RepairParallelism { +public enum RepairParallelism { + /** + * One node at a time + */ + SEQUENTIAL, + /** + * All nodes at the same time + */ + PARALLEL, + + /** + * One node per data center at a time + */ + DATACENTER_AWARE } diff --git a/src/main/java/org/apache/cassandra/service/StorageService.java b/src/main/java/org/apache/cassandra/service/StorageService.java index d6521c6..8cc3aba 100644 --- a/src/main/java/org/apache/cassandra/service/StorageService.java +++ b/src/main/java/org/apache/cassandra/service/StorageService.java @@ -29,6 +29,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import javax.management.*; import javax.management.openmbean.TabularData; @@ -36,7 +37,6 @@ import javax.ws.rs.core.MultivaluedMap; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.repair.RepairParallelism; - import com.cloudius.urchin.api.APIClient; import com.sun.jersey.core.util.MultivaluedMapImpl; @@ -51,7 +51,7 @@ public class StorageService extends NotificationBroadcasterSupport .getLogger(StorageService.class.getName()); private APIClient c = new APIClient(); - + private static Timer timer = new Timer("Storage Service Repair"); private StorageMetrics metrics = new StorageMetrics(); public static final StorageService instance = new StorageService(); @@ -60,6 +60,14 @@ public class StorageService extends NotificationBroadcasterSupport return instance; } + public static enum RepairStatus + { + STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED + } + + /* JMX notification serial number counter */ + private final AtomicLong notificationSerialNumber = new AtomicLong(); + private final ObjectName jmxObjectName; public StorageService() { @@ -513,6 +521,72 @@ public class StorageService extends NotificationBroadcasterSupport c.post("/storage_service/keyspace_flush/" + keyspaceName, queryParams); } + class CheckRepair extends TimerTask { + private APIClient c = new APIClient(); + int id; + String keyspace; + String message; + MultivaluedMap queryParams = new MultivaluedMapImpl(); + int cmd; + public CheckRepair(int id, String keyspace) { + this.id = id; + this.keyspace = keyspace; + APIClient.set_query_param(queryParams, "id", Integer.toString(id)); + message = String.format("Repair session %d ", id); + // The returned id is the command number + this.cmd = id; + } + @Override + public void run() { + String status = c.getStringValue("/storage_service/repair_async/" + keyspace, queryParams); + if (!status.equals("RUNNING")) { + cancel(); + if (!status.equals("SUCCESSFUL")) { + sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.SESSION_SUCCESS.ordinal()}); + } else { + sendNotification("repair", message + "failed", new int[]{cmd, RepairStatus.SESSION_FAILED.ordinal()}); + } + sendNotification("repair", message + "finished", new int[]{cmd, RepairStatus.FINISHED.ordinal()}); + } + } + + } + + /** + * Sends JMX notification to subscribers. + * + * @param type Message type + * @param message Message itself + * @param userObject Arbitrary object to attach to notification + */ + public void sendNotification(String type, String message, Object userObject) + { + Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message); + jmxNotification.setUserData(userObject); + sendNotification(jmxNotification); + } + + public String getRepairMessage(final int cmd, + final String keyspace, + final int ranges_size, + final RepairParallelism parallelismDegree, + final boolean fullRepair) { + return String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", + cmd, ranges_size, keyspace, parallelismDegree, fullRepair); + } + + /** + * + * @param repair + */ + public int waitAndNotifyRepair(int cmd, String keyspace, String message) { + logger.info(message); + sendNotification("repair", message, new int[]{cmd, RepairStatus.STARTED.ordinal()}); + TimerTask taskToExecute = new CheckRepair(cmd, keyspace); + timer.schedule(taskToExecute, 100, 1000); + return cmd; + } + /** * Invoke repair asynchronously. You can track repair progress by * subscribing JMX notification sent from this StorageServiceMBean. @@ -536,28 +610,26 @@ public class StorageService extends NotificationBroadcasterSupport opts = opts + op + "=" + options.get(op); } APIClient.set_query_param(queryParams, "options", opts); - return c.getIntValue("/storage_service/repair_async/" + keyspace); + int cmd = c.postInt("/storage_service/repair_async/" + keyspace); + waitAndNotifyRepair(cmd, keyspace, getRepairMessage(cmd, keyspace, 1, RepairParallelism.SEQUENTIAL, true)); + return cmd; } - @Deprecated + @Override public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException { log(" forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } - @Deprecated - public int forceRepairAsync(String keyspace, - RepairParallelism parallelismDegree, Collection dataCenters, - Collection hosts, boolean primaryRange, boolean fullRepair, - String... columnFamilies) { - log(" forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); - return c.getIntValue(""); + public int forceRepairAsync(String keyspace) { + Map options = new HashMap(); + return repairAsync(keyspace, options); } - @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, @@ -575,12 +647,13 @@ public class StorageService extends NotificationBroadcasterSupport return c.getIntValue(""); } - @Deprecated + @Override public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { log(" forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies)"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } @Deprecated @@ -1146,7 +1219,8 @@ public class StorageService extends NotificationBroadcasterSupport String... columnFamilies) { // TODO Auto-generated method stub log(" forceRepairAsync()"); - return c.getIntValue(""); + Map options = new HashMap(); + return repairAsync(keyspace, options); } @Override diff --git a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java index 77709ec..9acbbc8 100644 --- a/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/main/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -351,6 +351,7 @@ public interface StorageServiceMBean extends NotificationEmitter { Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies); + public int forceRepairAsync(String keyspace); /** * Same as forceRepairAsync, but handles a specified range */