diff --git a/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java new file mode 100644 index 0000000..12c83c8 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/ProgressInfo.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.io.Serializable; +import java.net.InetAddress; + +import com.google.common.base.Objects; + +/** + * ProgressInfo contains file transfer progress. + */ +public class ProgressInfo implements Serializable +{ + /** + * Direction of the stream. + */ + public static enum Direction + { + OUT(0), + IN(1); + + public final byte code; + + private Direction(int code) + { + this.code = (byte) code; + } + + public static Direction fromByte(byte direction) + { + return direction == 0 ? OUT : IN; + } + } + + public final InetAddress peer; + public final int sessionIndex; + public final String fileName; + public final Direction direction; + public final long currentBytes; + public final long totalBytes; + + public ProgressInfo(InetAddress peer, int sessionIndex, String fileName, Direction direction, long currentBytes, long totalBytes) + { + assert totalBytes > 0; + + this.peer = peer; + this.sessionIndex = sessionIndex; + this.fileName = fileName; + this.direction = direction; + this.currentBytes = currentBytes; + this.totalBytes = totalBytes; + } + + /** + * @return true if file transfer is completed + */ + public boolean isCompleted() + { + return currentBytes >= totalBytes; + } + + /** + * ProgressInfo is considered to be equal only when all attributes except currentBytes are equal. + */ + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ProgressInfo that = (ProgressInfo) o; + + if (totalBytes != that.totalBytes) return false; + if (direction != that.direction) return false; + if (!fileName.equals(that.fileName)) return false; + if (sessionIndex != that.sessionIndex) return false; + return peer.equals(that.peer); + } + + @Override + public int hashCode() + { + return Objects.hashCode(peer, sessionIndex, fileName, direction, totalBytes); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(fileName); + sb.append(" ").append(currentBytes); + sb.append("/").append(totalBytes).append(" bytes"); + sb.append("(").append(currentBytes*100/totalBytes).append("%) "); + sb.append(direction == Direction.OUT ? "sent to " : "received from "); + sb.append("idx:").append(sessionIndex); + sb.append(peer); + return sb.toString(); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/SessionInfo.java b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java new file mode 100644 index 0000000..5afe7c6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/SessionInfo.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import javax.json.JsonArray; +import javax.json.JsonObject; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +/** + * Stream session info. + */ +public final class SessionInfo implements Serializable +{ + public final InetAddress peer; + public final int sessionIndex; + public final InetAddress connecting; + /** Immutable collection of receiving summaries */ + public final Collection receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection sendingSummaries; + /** Current session state */ + public final StreamSession.State state; + + private final Map receivingFiles; + private final Map sendingFiles; + + static InetAddress address(String val) { + try { + return InetAddress.getByName(val); + } catch (UnknownHostException e) { + } + return null; + } + + + public SessionInfo(InetAddress peer, + int sessionIndex, + InetAddress connecting, + Collection receivingSummaries, + Collection sendingSummaries, + StreamSession.State state) { + this.peer = peer; + this.sessionIndex = sessionIndex; + this.connecting = connecting; + this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries); + this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries); + this.receivingFiles = new ConcurrentHashMap<>(); + this.sendingFiles = new ConcurrentHashMap<>(); + this.state = state; + } + + public SessionInfo(String peer, + int sessionIndex, + String connecting, + Collection receivingSummaries, + Collection sendingSummaries, + String state) { + this(address(peer), sessionIndex, address(connecting), receivingSummaries, sendingSummaries, + StreamSession.State.valueOf(state)); + } + + public static SessionInfo fromJsonObject(JsonObject obj) { + return new SessionInfo(obj.getString("peer"), obj.getInt("sessionIndex"), + obj.getString("connecting"), + StreamSummary.fromJsonArr(obj.getJsonArray("receivingSummaries")), + StreamSummary.fromJsonArr(obj.getJsonArray("sendingSummaries")), + obj.getString("state")); + } + + public static Set fromJsonArr(JsonArray arr) { + Set res = new HashSet(); + if (arr != null) { + for (int i = 0; i < arr.size(); i++) { + res.add(fromJsonObject(arr.getJsonObject(i))); + } + } + return res; + } + + public boolean isFailed() + { + return state == StreamSession.State.FAILED; + } + + /** + * Update progress of receiving/sending file. + * + * @param newProgress new progress info + */ + public void updateProgress(ProgressInfo newProgress) + { + assert peer.equals(newProgress.peer); + + Map currentFiles = newProgress.direction == ProgressInfo.Direction.IN + ? receivingFiles : sendingFiles; + currentFiles.put(newProgress.fileName, newProgress); + } + + public Collection getReceivingFiles() + { + return receivingFiles.values(); + } + + public Collection getSendingFiles() + { + return sendingFiles.values(); + } + + /** + * @return total number of files already received. + */ + public long getTotalFilesReceived() + { + return getTotalFilesCompleted(receivingFiles.values()); + } + + /** + * @return total number of files already sent. + */ + public long getTotalFilesSent() + { + return getTotalFilesCompleted(sendingFiles.values()); + } + + /** + * @return total size(in bytes) already received. + */ + public long getTotalSizeReceived() + { + return getTotalSizeInProgress(receivingFiles.values()); + } + + /** + * @return total size(in bytes) already sent. + */ + public long getTotalSizeSent() + { + return getTotalSizeInProgress(sendingFiles.values()); + } + + /** + * @return total number of files to receive in the session + */ + public long getTotalFilesToReceive() + { + return getTotalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long getTotalFilesToSend() + { + return getTotalFiles(sendingSummaries); + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long getTotalSizeToReceive() + { + return getTotalSizes(receivingSummaries); + } + + /** + * @return total size(in bytes) to send in the session + */ + public long getTotalSizeToSend() + { + return getTotalSizes(sendingSummaries); + } + + private long getTotalSizeInProgress(Collection files) + { + long total = 0; + for (ProgressInfo file : files) + total += file.currentBytes; + return total; + } + + private long getTotalFiles(Collection summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } + + private long getTotalSizes(Collection summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + private long getTotalFilesCompleted(Collection files) + { + Iterable completed = Iterables.filter(files, new Predicate() + { + public boolean apply(ProgressInfo input) + { + return input.isCompleted(); + } + }); + return Iterables.size(completed); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/StreamManager.java b/src/main/java/org/apache/cassandra/streaming/StreamManager.java new file mode 100644 index 0000000..89f7d1b --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/StreamManager.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.util.HashSet; +import java.util.Set; + +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.management.ListenerNotFoundException; +import javax.management.MBeanNotificationInfo; +import javax.management.NotificationFilter; +import javax.management.NotificationListener; +import javax.management.openmbean.CompositeData; + +import org.apache.cassandra.streaming.management.StreamStateCompositeData; + +import com.cloudius.urchin.api.APIClient; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * StreamManager manages currently running {@link StreamResultFuture}s and + * provides status of all operation invoked. + * + * All stream operation should be created through this class to track streaming + * status and progress. + */ +public class StreamManager implements StreamManagerMBean { + public static final StreamManager instance = new StreamManager(); + private static final java.util.logging.Logger logger = java.util.logging.Logger + .getLogger(StreamManager.class.getName()); + private APIClient c = new APIClient(); + + public Set getState() { + JsonArray arr = c.getJsonArray("/stream_manager/"); + Set res = new HashSet(); + for (int i = 0; i < arr.size(); i++) { + JsonObject obj = arr.getJsonObject(i); + res.add(new StreamState(obj.getString("plan_id"), obj.getString("description"), SessionInfo.fromJsonArr(obj.getJsonArray("sessions")))); + } + return res; + } + + public static StreamManager getInstance() { + return instance; + } + public Set getCurrentStreams() { + logger.info("getCurrentStreams"); + return Sets.newHashSet(Iterables.transform(getState(), new Function() + { + public CompositeData apply(StreamState input) + { + return StreamStateCompositeData.toCompositeData(input); + } + })); + } + + @Override + public void removeNotificationListener(NotificationListener arg0, + NotificationFilter arg1, Object arg2) + throws ListenerNotFoundException { + // TODO Auto-generated method stub + + } + + @Override + public void addNotificationListener(NotificationListener arg0, + NotificationFilter arg1, Object arg2) + throws IllegalArgumentException { + // TODO Auto-generated method stub + + } + + @Override + public MBeanNotificationInfo[] getNotificationInfo() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void removeNotificationListener(NotificationListener arg0) + throws ListenerNotFoundException { + // TODO Auto-generated method stub + + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java new file mode 100644 index 0000000..28b25db --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/StreamManagerMBean.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.util.Set; +import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; + +public interface StreamManagerMBean extends NotificationEmitter { + public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager"; + + /** + * Returns the current state of all ongoing streams. + */ + Set getCurrentStreams(); +} diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSession.java b/src/main/java/org/apache/cassandra/streaming/StreamSession.java new file mode 100644 index 0000000..7646dc6 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/StreamSession.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +/** + * Handles the streaming a one or more section of one of more sstables to and from a specific + * remote node. + * + * Both this node and the remote one will create a similar symmetrical StreamSession. A streaming + * session has the following life-cycle: + * + * 1. Connections Initialization + * + * (a) A node (the initiator in the following) create a new StreamSession, initialize it (init()) + * and then start it (start()). Start will create a {@link ConnectionHandler} that will create + * two connections to the remote node (the follower in the following) with whom to stream and send + * a StreamInit message. The first connection will be the incoming connection for the + * initiator, and the second connection will be the outgoing. + * (b) Upon reception of that StreamInit message, the follower creates its own StreamSession, + * initialize it if it still does not exist, and attach connecting socket to its ConnectionHandler + * according to StreamInit message's isForOutgoing flag. + * (d) When the both incoming and outgoing connections are established, StreamSession calls + * StreamSession#onInitializationComplete method to start the streaming prepare phase + * (StreamResultFuture.startStreaming()). + * + * 2. Streaming preparation phase + * + * (a) This phase is started when the initiator onInitializationComplete() method is called. This method sends a + * PrepareMessage that includes what files/sections this node will stream to the follower + * (stored in a StreamTransferTask, each column family has it's own transfer task) and what + * the follower needs to stream back (StreamReceiveTask, same as above). If the initiator has + * nothing to receive from the follower, it goes directly to its Streaming phase. Otherwise, + * it waits for the follower PrepareMessage. + * (b) Upon reception of the PrepareMessage, the follower records which files/sections it will receive + * and send back its own PrepareMessage with a summary of the files/sections that will be sent to + * the initiator (prepare()). After having sent that message, the follower goes to its Streamning + * phase. + * (c) When the initiator receives the follower PrepareMessage, it records which files/sections it will + * receive and then goes to his own Streaming phase. + * + * 3. Streaming phase + * + * (a) The streaming phase is started by each node (the sender in the follower, but note that each side + * of the StreamSession may be sender for some of the files) involved by calling startStreamingFiles(). + * This will sequentially send a FileMessage for each file of each SteamTransferTask. Each FileMessage + * consists of a FileMessageHeader that indicates which file is coming and then start streaming the + * content for that file (StreamWriter in FileMessage.serialize()). When a file is fully sent, the + * fileSent() method is called for that file. If all the files for a StreamTransferTask are sent + * (StreamTransferTask.complete()), the task is marked complete (taskCompleted()). + * (b) On the receiving side, a SSTable will be written for the incoming file (StreamReader in + * FileMessage.deserialize()) and once the FileMessage is fully received, the file will be marked as + * complete (received()). When all files for the StreamReceiveTask have been received, the sstables + * are added to the CFS (and 2ndary index are built, StreamReceiveTask.complete()) and the task + * is marked complete (taskCompleted()) + * (b) If during the streaming of a particular file an I/O error occurs on the receiving end of a stream + * (FileMessage.deserialize), the node will retry the file (up to DatabaseDescriptor.getMaxStreamingRetries()) + * by sending a RetryMessage to the sender. On receiving a RetryMessage, the sender simply issue a new + * FileMessage for that file. + * (c) When all transfer and receive tasks for a session are complete, the move to the Completion phase + * (maybeCompleted()). + * + * 4. Completion phase + * + * (a) When a node has finished all transfer and receive task, it enter the completion phase (maybeCompleted()). + * If it had already received a CompleteMessage from the other side (it is in the WAIT_COMPLETE state), that + * session is done is is closed (closeSession()). Otherwise, the node switch to the WAIT_COMPLETE state and + * send a CompleteMessage to the other side. + */ +public class StreamSession +{ + + public static enum State + { + INITIALIZED, + PREPARING, + STREAMING, + WAIT_COMPLETE, + COMPLETE, + FAILED, + } + + +} diff --git a/src/main/java/org/apache/cassandra/streaming/StreamState.java b/src/main/java/org/apache/cassandra/streaming/StreamState.java new file mode 100644 index 0000000..fbaa3fb --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/StreamState.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.io.Serializable; +import java.util.Set; +import java.util.UUID; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +/** + * Current snapshot of streaming progress. + */ +public class StreamState implements Serializable +{ + /** + * + */ + private static final long serialVersionUID = 1L; + public final UUID planId; + public final String description; + public final Set sessions; + + public StreamState(UUID planId, String description, Set sessions) { + this.planId = planId; + this.description = description; + this.sessions = sessions; + } + public StreamState(String planId, String description, Set sessions) + { + this(UUID.fromString(planId), description, sessions); + } + + public boolean hasFailedSession() + { + return Iterables.any(sessions, new Predicate() + { + public boolean apply(SessionInfo session) + { + return session.isFailed(); + } + }); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/StreamSummary.java b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java new file mode 100644 index 0000000..b70a815 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/StreamSummary.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; + +import javax.json.JsonArray; +import javax.json.JsonObject; + +import com.google.common.base.Objects; + +/** + * Summary of streaming. + */ +public class StreamSummary +{ + public final UUID cfId; + + /** + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. + */ + public final int files; + public final long totalSize; + + public StreamSummary(UUID cfId, int files, long totalSize) + { + this.cfId = cfId; + this.files = files; + this.totalSize = totalSize; + } + + public StreamSummary(String cfId, int files, long totalSize) { + this(UUID.fromString(cfId), files, totalSize); + } + + public static StreamSummary fromJsonObject(JsonObject obj) { + return new StreamSummary(obj.getString("cf_id"), obj.getInt("files"), obj.getJsonNumber("total_size").longValue()); + } + + public static Collection fromJsonArr(JsonArray arr) { + Collection res = new ArrayList(); + for (int i = 0; i < arr.size(); i++) { + res.add(fromJsonObject(arr.getJsonObject(i))); + } + return res; + } + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamSummary summary = (StreamSummary) o; + return files == summary.files && totalSize == summary.totalSize && cfId.equals(summary.cfId); + } + + @Override + public int hashCode() + { + return Objects.hashCode(cfId, files, totalSize); + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder("StreamSummary{"); + sb.append("path=").append(cfId); + sb.append(", files=").append(files); + sb.append(", totalSize=").append(totalSize); + sb.append('}'); + return sb.toString(); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java new file mode 100644 index 0000000..b722c13 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming.management; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import javax.management.openmbean.*; + +import com.google.common.base.Throwables; + +import org.apache.cassandra.streaming.ProgressInfo; + +public class ProgressInfoCompositeData +{ + private static final String[] ITEM_NAMES = new String[]{"planId", + "peer", + "sessionIndex", + "fileName", + "direction", + "currentBytes", + "totalBytes"}; + private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID", + "Session peer", + "Index of session", + "Name of the file", + "Direction('IN' or 'OUT')", + "Current bytes transferred", + "Total bytes to transfer"}; + private static final OpenType[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, + SimpleType.STRING, + SimpleType.INTEGER, + SimpleType.STRING, + SimpleType.STRING, + SimpleType.LONG, + SimpleType.LONG}; + + public static final CompositeType COMPOSITE_TYPE; + static { + try + { + COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(), + "ProgressInfo", + ITEM_NAMES, + ITEM_DESCS, + ITEM_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo) + { + Map valueMap = new HashMap<>(); + valueMap.put(ITEM_NAMES[0], planId.toString()); + valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress()); + valueMap.put(ITEM_NAMES[2], progressInfo.sessionIndex); + valueMap.put(ITEM_NAMES[3], progressInfo.fileName); + valueMap.put(ITEM_NAMES[4], progressInfo.direction.name()); + valueMap.put(ITEM_NAMES[5], progressInfo.currentBytes); + valueMap.put(ITEM_NAMES[6], progressInfo.totalBytes); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static ProgressInfo fromCompositeData(CompositeData cd) + { + Object[] values = cd.getAll(ITEM_NAMES); + try + { + return new ProgressInfo(InetAddress.getByName((String) values[1]), + (int) values[2], + (String) values[3], + ProgressInfo.Direction.valueOf((String)values[4]), + (long) values[5], + (long) values[6]); + } + catch (UnknownHostException e) + { + throw Throwables.propagate(e); + } + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java new file mode 100644 index 0000000..eb1414f --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming.management; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.*; +import javax.management.openmbean.*; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamSummary; + +public class SessionInfoCompositeData +{ + private static final String[] ITEM_NAMES = new String[]{"planId", + "peer", + "connecting", + "receivingSummaries", + "sendingSummaries", + "state", + "receivingFiles", + "sendingFiles", + "sessionIndex"}; + private static final String[] ITEM_DESCS = new String[]{"Plan ID", + "Session peer", + "Connecting address", + "Summaries of receiving data", + "Summaries of sending data", + "Current session state", + "Receiving files", + "Sending files", + "Session index"}; + private static final OpenType[] ITEM_TYPES; + + public static final CompositeType COMPOSITE_TYPE; + static { + try + { + ITEM_TYPES = new OpenType[]{SimpleType.STRING, + SimpleType.STRING, + SimpleType.STRING, + ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), + ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE), + SimpleType.STRING, + ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), + ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE), + SimpleType.INTEGER}; + COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(), + "SessionInfo", + ITEM_NAMES, + ITEM_DESCS, + ITEM_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo) + { + Map valueMap = new HashMap<>(); + valueMap.put(ITEM_NAMES[0], planId.toString()); + valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress()); + valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress()); + Function fromStreamSummary = new Function() + { + public CompositeData apply(StreamSummary input) + { + return StreamSummaryCompositeData.toCompositeData(input); + } + }; + valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[4], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary)); + valueMap.put(ITEM_NAMES[5], sessionInfo.state.name()); + Function fromProgressInfo = new Function() + { + public CompositeData apply(ProgressInfo input) + { + return ProgressInfoCompositeData.toCompositeData(planId, input); + } + }; + valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[7], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo)); + valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static SessionInfo fromCompositeData(CompositeData cd) + { + assert cd.getCompositeType().equals(COMPOSITE_TYPE); + + Object[] values = cd.getAll(ITEM_NAMES); + InetAddress peer, connecting; + try + { + peer = InetAddress.getByName((String) values[1]); + connecting = InetAddress.getByName((String) values[2]); + } + catch (UnknownHostException e) + { + throw Throwables.propagate(e); + } + Function toStreamSummary = new Function() + { + public StreamSummary apply(CompositeData input) + { + return StreamSummaryCompositeData.fromCompositeData(input); + } + }; + SessionInfo info = new SessionInfo(peer, + (int)values[8], + connecting, + fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary), + fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary), + StreamSession.State.valueOf((String) values[5])); + Function toProgressInfo = new Function() + { + public ProgressInfo apply(CompositeData input) + { + return ProgressInfoCompositeData.fromCompositeData(input); + } + }; + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo)) + { + info.updateProgress(progress); + } + for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo)) + { + info.updateProgress(progress); + } + return info; + } + + private static Collection fromArrayOfCompositeData(CompositeData[] cds, Function func) + { + return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func)); + } + + private static CompositeData[] toArrayOfCompositeData(Collection toConvert, Function func) + { + CompositeData[] composites = new CompositeData[toConvert.size()]; + return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java new file mode 100644 index 0000000..3f57608 --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming.management; + +import java.util.*; +import javax.management.openmbean.*; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.cassandra.streaming.SessionInfo; +import org.apache.cassandra.streaming.StreamState; + +/** + */ +public class StreamStateCompositeData +{ + private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions", + "currentRxBytes", "totalRxBytes", "rxPercentage", + "currentTxBytes", "totalTxBytes", "txPercentage"}; + private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream", + "Stream plan description", + "Active stream sessions", + "Number of bytes received across all streams", + "Total bytes available to receive across all streams", + "Percentage received across all streams", + "Number of bytes sent across all streams", + "Total bytes available to send across all streams", + "Percentage sent across all streams"}; + private static final OpenType[] ITEM_TYPES; + + public static final CompositeType COMPOSITE_TYPE; + static { + try + { + ITEM_TYPES = new OpenType[]{SimpleType.STRING, + SimpleType.STRING, + ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE), + SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE, + SimpleType.LONG, SimpleType.LONG, SimpleType.DOUBLE}; + COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(), + "StreamState", + ITEM_NAMES, + ITEM_DESCS, + ITEM_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static CompositeData toCompositeData(final StreamState streamState) + { + Map valueMap = new HashMap<>(); + valueMap.put(ITEM_NAMES[0], streamState.planId.toString()); + valueMap.put(ITEM_NAMES[1], streamState.description); + + CompositeData[] sessions = new CompositeData[streamState.sessions.size()]; + Lists.newArrayList(Iterables.transform(streamState.sessions, new Function() + { + public CompositeData apply(SessionInfo input) + { + return SessionInfoCompositeData.toCompositeData(streamState.planId, input); + } + })).toArray(sessions); + valueMap.put(ITEM_NAMES[2], sessions); + + long currentRxBytes = 0; + long totalRxBytes = 0; + long currentTxBytes = 0; + long totalTxBytes = 0; + for (SessionInfo sessInfo : streamState.sessions) + { + currentRxBytes += sessInfo.getTotalSizeReceived(); + totalRxBytes += sessInfo.getTotalSizeToReceive(); + currentTxBytes += sessInfo.getTotalSizeSent(); + totalTxBytes += sessInfo.getTotalSizeToSend(); + } + double rxPercentage = (totalRxBytes == 0 ? 100L : currentRxBytes * 100L / totalRxBytes); + double txPercentage = (totalTxBytes == 0 ? 100L : currentTxBytes * 100L / totalTxBytes); + + valueMap.put(ITEM_NAMES[3], currentRxBytes); + valueMap.put(ITEM_NAMES[4], totalRxBytes); + valueMap.put(ITEM_NAMES[5], rxPercentage); + valueMap.put(ITEM_NAMES[6], currentTxBytes); + valueMap.put(ITEM_NAMES[7], totalTxBytes); + valueMap.put(ITEM_NAMES[8], txPercentage); + + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static StreamState fromCompositeData(CompositeData cd) + { + assert cd.getCompositeType().equals(COMPOSITE_TYPE); + Object[] values = cd.getAll(ITEM_NAMES); + UUID planId = UUID.fromString((String) values[0]); + String description = (String) values[1]; + Set sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]), + new Function() + { + public SessionInfo apply(CompositeData input) + { + return SessionInfoCompositeData.fromCompositeData(input); + } + })); + return new StreamState(planId, description, sessions); + } +} diff --git a/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java new file mode 100644 index 0000000..d649aff --- /dev/null +++ b/src/main/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright 2015 ScyllaDB + * + * Modified by ScyllaDB + */ + +package org.apache.cassandra.streaming.management; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import javax.management.openmbean.*; + +import com.google.common.base.Throwables; + +import org.apache.cassandra.streaming.StreamSummary; + +/** + */ +public class StreamSummaryCompositeData +{ + private static final String[] ITEM_NAMES = new String[]{"cfId", + "files", + "totalSize"}; + private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID", + "Number of files", + "Total bytes of the files"}; + private static final OpenType[] ITEM_TYPES = new OpenType[]{SimpleType.STRING, + SimpleType.INTEGER, + SimpleType.LONG}; + + public static final CompositeType COMPOSITE_TYPE; + static { + try + { + COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(), + "StreamSummary", + ITEM_NAMES, + ITEM_DESCS, + ITEM_TYPES); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static CompositeData toCompositeData(StreamSummary streamSummary) + { + Map valueMap = new HashMap<>(); + valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString()); + valueMap.put(ITEM_NAMES[1], streamSummary.files); + valueMap.put(ITEM_NAMES[2], streamSummary.totalSize); + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, valueMap); + } + catch (OpenDataException e) + { + throw Throwables.propagate(e); + } + } + + public static StreamSummary fromCompositeData(CompositeData cd) + { + Object[] values = cd.getAll(ITEM_NAMES); + return new StreamSummary(UUID.fromString((String) values[0]), + (int) values[1], + (long) values[2]); + } +}