diff --git a/.gitignore b/.gitignore index 7e6d513a6..07684843e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ ivy.jar /out /.idea *.iml -/.DS_Store +*.DS_Store .settings/org.eclipse.jdt.core.prefs .settings/org.eclipse.jdt.ui.prefs /junitvmwatcher*.properties diff --git a/.travis.yml b/.travis.yml index 0434e5466..0e5b16131 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,4 +3,6 @@ jdk: #- openjdk7 - oraclejdk8 script: "ant all" -sudo: false \ No newline at end of file +sudo: false +after_success: + - bash <(curl -s https://codecov.io/bash) \ No newline at end of file diff --git a/README.md b/README.md index 570c731aa..b9fb2dd89 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -![las2peer](https://github.com/rwth-acis/las2peer/blob/master/img/logo/bitmap/las2peer-logo-128x128.png) +![las2peer](https://rwth-acis.github.io/las2peer/logo/vector/las2peer-logo.svg) + +# [![Build Status](http://layers.dbis.rwth-aachen.de/jenkins/buildStatus/icon?job=las2peer%20Core)](http://layers.dbis.rwth-aachen.de/jenkins/job/las2peer%20Core/) [![Build Status](https://travis-ci.org/rwth-acis/las2peer.svg?branch=master)](https://travis-ci.org/rwth-acis/las2peer) [![Gitter](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/rwth-acis/las2peer) las2peer is a Java-based server framework for developing and deploying services in a distributed Peer-to-Peer (P2P) environment. las2peer was developed by the Advanced Community Information Systems (ACIS) group at the Chair of Computer Science 5 (Information Systems & Databases), RWTH Aachen University, Germany. It's main focus lies on providing developers with a tool to easily develop and test their services and deploy them in a P2P network without having to rely on a centralized infrastructure. @@ -34,9 +36,6 @@ To build the las2peer jar file simply run default target: or directly ```ant jars``` -Jenkins: [![Build Status](http://layers.dbis.rwth-aachen.de/jenkins/buildStatus/icon?job=las2peer Core)](http://layers.dbis.rwth-aachen.de/jenkins/job/las2peer%20Core/) - -Travis CI: [![Build Status](https://travis-ci.org/rwth-acis/las2peer.svg?branch=master)](https://travis-ci.org/rwth-acis/las2peer) JUnit Tests ----------- diff --git a/bin/LAS2peer Node Launcher.bat b/bin/las2peer Node Launcher.bat similarity index 99% rename from bin/LAS2peer Node Launcher.bat rename to bin/las2peer Node Launcher.bat index 58095daac..98af0200a 100644 --- a/bin/LAS2peer Node Launcher.bat +++ b/bin/las2peer Node Launcher.bat @@ -7,4 +7,3 @@ cd %~dp0 java -cp %CLASSPATH% i5.las2peer.tools.L2pNodeLauncher %* pause - diff --git a/bin/LAS2peer Node Launcher.sh b/bin/las2peer Node Launcher.sh similarity index 95% rename from bin/LAS2peer Node Launcher.sh rename to bin/las2peer Node Launcher.sh index 1408b5b04..8b0cb2e0c 100755 --- a/bin/LAS2peer Node Launcher.sh +++ b/bin/las2peer Node Launcher.sh @@ -9,7 +9,7 @@ fi; if [ $(uname -o) = "Cygwin" ] then - # we're in cygwin + # we're in cygwin export COLOR_DISABLED=1 export CLASSPATH="${BASE}lib/*;${BASE}export/jars/las2peer.jar;" else diff --git a/bin/start-network.sh b/bin/start-network.sh index a77a01c38..04b4b4a8d 100755 --- a/bin/start-network.sh +++ b/bin/start-network.sh @@ -3,10 +3,18 @@ # this script is an example on how to launch a persistent network with multiple nodes at once # it assumes that you have your las2peer.jar and all other dependencies in ./lib/ +PIDFILE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/las2peer-network-screens.pid +echo "$PIDFILE" + # launch bootstrap node at first -screen -S 9085 -d -m java -cp "lib/*" i5.las2peer.tools.L2pNodeLauncher --port 9085 --node-id-seed=9085 interactive +echo "launching bootstrap node at port 14501" +screen -S 14501 -D -m java -cp "lib/*" i5.las2peer.tools.L2pNodeLauncher --port 14501 --node-id-seed 14501 interactive & +# save process id to be used in stop-network.sh later +echo $! > "$PIDFILE" # launch other network nodes -for i in {9086..9089}; do - screen -S $i -d -m java -cp "lib/*" i5.las2peer.tools.L2pNodeLauncher --port $i --bootstrap localhost:9085 --node-id-seed=$i interactive +for port in {14502..14510}; do + echo "launching node at port $port" + screen -S $port -D -m java -cp "lib/*" i5.las2peer.tools.L2pNodeLauncher --port $port --bootstrap localhost:14501 --node-id-seed $port interactive & + echo $! >> "$PIDFILE" done diff --git a/bin/stop-network.sh b/bin/stop-network.sh new file mode 100755 index 000000000..db14fb599 --- /dev/null +++ b/bin/stop-network.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# this files kills all processes listed in the pid file +# it can be used to shutdown multiple nodes at once + +PIDFILE="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/las2peer-network-screens.pid + +while read pid; do + kill $pid +done < "$PIDFILE" + +/bin/rm "$PIDFILE" diff --git a/build.xml b/build.xml index e375d4cef..41f1e9ba3 100644 --- a/build.xml +++ b/build.xml @@ -5,7 +5,7 @@ - + @@ -378,7 +378,7 @@ windowtitle="las2peer Documentation" failonerror="yes" encoding="utf8" - classpath="${lib.cp}:${tmp.classes}" + classpath="${lib.cp}:${tmp.classes}:${lib.junit}" > @@ -412,26 +412,30 @@ - + + + - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + @@ -446,7 +450,7 @@ - + @@ -461,6 +465,23 @@ + + + + + + + + + + + + + + + + + diff --git a/ivy/ivy.xml b/ivy/ivy.xml index dacc11c94..cadbcb509 100644 --- a/ivy/ivy.xml +++ b/ivy/ivy.xml @@ -13,5 +13,13 @@ + + + + + + + + diff --git a/src/main/java/i5/las2peer/api/Context.java b/src/main/java/i5/las2peer/api/Context.java index b108adb44..d402e7a80 100644 --- a/src/main/java/i5/las2peer/api/Context.java +++ b/src/main/java/i5/las2peer/api/Context.java @@ -113,8 +113,8 @@ public interface Context { * @throws StorageException */ @Deprecated - public Envelope getStoredObject(String className, String identifier) throws ArtifactNotFoundException, - StorageException; + public Envelope getStoredObject(String className, String identifier) + throws ArtifactNotFoundException, StorageException; /** * Gives access to the local node. @@ -186,8 +186,8 @@ public Envelope createEnvelope(Envelope previousVersion, Serializable content, A public Envelope createEnvelope(Envelope previousVersion, Serializable content, List readers) throws IllegalArgumentException, SerializationException, CryptoException; - public Envelope createUnencryptedEnvelope(String identifier, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException; + public Envelope createUnencryptedEnvelope(String identifier, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException; public Envelope createUnencryptedEnvelope(Envelope previousVersion, Serializable content) throws IllegalArgumentException, SerializationException, CryptoException; @@ -202,11 +202,11 @@ public void storeEnvelopeAsync(Envelope envelope, Agent author, StorageStoreResu public void fetchEnvelopeAsync(String identifier, StorageEnvelopeHandler envelopeHandler, StorageExceptionHandler exceptionHandler); - public Envelope createEnvelope(String identifier, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException; + public Envelope createEnvelope(String identifier, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException; - public Envelope createEnvelope(Envelope previousVersion, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException; + public Envelope createEnvelope(Envelope previousVersion, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException; public void storeEnvelope(Envelope envelope) throws StorageException; @@ -246,7 +246,7 @@ public Serializable invoke(String service, String method, Serializable... parame * @throws ServiceNotAvailableException If the service is temporarily not available. * @throws RemoteServiceException If the remote service throws an exception. */ - public Serializable invokeInterally(String service, String method, Serializable... parameters) + public Serializable invokeInternally(String service, String method, Serializable... parameters) throws ServiceNotFoundException, ServiceNotAvailableException, RemoteServiceException; } diff --git a/src/main/java/i5/las2peer/classLoaders/libraries/LoadedLibrary.java b/src/main/java/i5/las2peer/classLoaders/libraries/LoadedLibrary.java index 004600460..fdccb928a 100755 --- a/src/main/java/i5/las2peer/classLoaders/libraries/LoadedLibrary.java +++ b/src/main/java/i5/las2peer/classLoaders/libraries/LoadedLibrary.java @@ -18,10 +18,9 @@ */ public abstract class LoadedLibrary { - private HashSet resolvedDependencies; - private LibraryDependency[] initialDependencies; - - private LibraryIdentifier myLibrary; + private final HashSet resolvedDependencies; + private final LibraryDependency[] initialDependencies; + private final LibraryIdentifier myLibrary; /** * generates a new CL without any dependencies i.e. this ClassLoader may not use any other registered libraries @@ -29,9 +28,7 @@ public abstract class LoadedLibrary { * @param libraryIdentifier identifier of the library bound to this ClassLoader */ LoadedLibrary(String libraryIdentifier) { - super(); - - myLibrary = new LibraryIdentifier(libraryIdentifier); + this(new LibraryIdentifier(libraryIdentifier)); } /** @@ -40,9 +37,7 @@ public abstract class LoadedLibrary { * @param lib identifier of the library bound to this ClassLoader */ LoadedLibrary(LibraryIdentifier lib) { - super(); - - myLibrary = lib; + this(lib, new LibraryDependency[0]); } /** @@ -62,8 +57,9 @@ public abstract class LoadedLibrary { * @param initialDependencies array with ClassLoaders this one may use for class loading */ LoadedLibrary(LibraryIdentifier lib, LibraryDependency[] initialDependencies) { - this(lib); + this.myLibrary = lib; this.initialDependencies = initialDependencies; + resolvedDependencies = new HashSet<>(); } /** @@ -88,7 +84,7 @@ public LibraryDependency[] getDependencies() { * @param libs */ void setResolvedDependencies(LoadedLibrary[] libs) { - resolvedDependencies = new HashSet(); + resolvedDependencies.clear(); for (LoadedLibrary lib : libs) { resolvedDependencies.add(lib); } diff --git a/src/main/java/i5/las2peer/classLoaders/libraries/LoadedNetworkLibrary.java b/src/main/java/i5/las2peer/classLoaders/libraries/LoadedNetworkLibrary.java index 4403a349d..0e7f590d1 100644 --- a/src/main/java/i5/las2peer/classLoaders/libraries/LoadedNetworkLibrary.java +++ b/src/main/java/i5/las2peer/classLoaders/libraries/LoadedNetworkLibrary.java @@ -22,7 +22,6 @@ import i5.las2peer.tools.CryptoException; import i5.las2peer.tools.SerializationException; import i5.las2peer.tools.XmlTools; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * This class is stored as meta information in the network and represents a network library. All getter refer to @@ -42,7 +41,8 @@ public LoadedNetworkLibrary(NodeStorageInterface node, LibraryIdentifier lib, @Override public URL getResourceAsUrl(String resourceName) throws ResourceNotFoundException { // TODO implement get resource as URL in network library - throw new NotImplementedException(); + // return las2peer:// URL or https:// URL? + throw new ResourceNotFoundException("getResourceAsUrl not implemented, yet", getIdentifier().toString()); } @Override diff --git a/src/main/java/i5/las2peer/execution/L2pThread.java b/src/main/java/i5/las2peer/execution/L2pThread.java index e9ef94d7f..6c93771f1 100644 --- a/src/main/java/i5/las2peer/execution/L2pThread.java +++ b/src/main/java/i5/las2peer/execution/L2pThread.java @@ -200,13 +200,14 @@ public Envelope getStoredObject(long id) throws ArtifactNotFoundException, Stora } @Override - public Envelope getStoredObject(Class cls, String identifier) throws ArtifactNotFoundException, StorageException { + public Envelope getStoredObject(Class cls, String identifier) + throws ArtifactNotFoundException, StorageException { return null; } @Override - public Envelope getStoredObject(String className, String identifier) throws ArtifactNotFoundException, - StorageException { + public Envelope getStoredObject(String className, String identifier) + throws ArtifactNotFoundException, StorageException { return null; } @@ -270,8 +271,8 @@ public Envelope createEnvelope(Envelope previousVersion, Serializable content, L } @Override - public Envelope createUnencryptedEnvelope(String identifier, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException { + public Envelope createUnencryptedEnvelope(String identifier, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException { return callerContext.createUnencryptedEnvelope(identifier, content); } @@ -304,14 +305,14 @@ public void fetchEnvelopeAsync(String identifier, StorageEnvelopeHandler envelop } @Override - public Envelope createEnvelope(String identifier, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException { + public Envelope createEnvelope(String identifier, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException { return callerContext.createEnvelope(identifier, content); } @Override - public Envelope createEnvelope(Envelope previousVersion, Serializable content) throws IllegalArgumentException, - SerializationException, CryptoException { + public Envelope createEnvelope(Envelope previousVersion, Serializable content) + throws IllegalArgumentException, SerializationException, CryptoException { return callerContext.createEnvelope(previousVersion, content); } @@ -342,7 +343,7 @@ public Serializable invoke(String service, String method, Serializable... parame } @Override - public Serializable invokeInterally(String service, String method, Serializable... parameters) + public Serializable invokeInternally(String service, String method, Serializable... parameters) throws ServiceNotFoundException, ServiceNotAvailableException, RemoteServiceException { return invokeWithAgent(serviceAgent, service, method, parameters); } diff --git a/src/main/java/i5/las2peer/p2p/Node.java b/src/main/java/i5/las2peer/p2p/Node.java index 375803b37..5facad8db 100644 --- a/src/main/java/i5/las2peer/p2p/Node.java +++ b/src/main/java/i5/las2peer/p2p/Node.java @@ -1,5 +1,26 @@ package i5.las2peer.p2p; +import java.io.File; +import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.reflect.InvocationTargetException; +import java.security.KeyPair; +import java.security.PublicKey; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Vector; + +import com.sun.management.OperatingSystemMXBean; + import i5.las2peer.api.Configurable; import i5.las2peer.api.exceptions.ArtifactNotFoundException; import i5.las2peer.api.exceptions.EnvelopeAlreadyExistsException; @@ -42,32 +63,10 @@ import i5.las2peer.tools.CryptoException; import i5.las2peer.tools.CryptoTools; import i5.las2peer.tools.SerializationException; - -import java.io.File; -import java.io.Serializable; -import java.lang.management.ManagementFactory; -import java.lang.reflect.InvocationTargetException; -import java.security.KeyPair; -import java.security.PublicKey; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Hashtable; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.Vector; - import rice.pastry.NodeHandle; import rice.pastry.PastryNode; import rice.pastry.socket.SocketNodeHandle; -import com.sun.management.OperatingSystemMXBean; - /** * Base class for nodes in the las2peer environment. * @@ -144,12 +143,12 @@ public enum NodeStatus { /** * observers to be notified of all occurring events */ - private HashSet observers = new HashSet(); + private HashSet observers = new HashSet<>(); /** * contexts for local method invocation */ - private Hashtable htLocalExecutionContexts = new Hashtable(); + private Hashtable htLocalExecutionContexts = new Hashtable<>(); /** * Timer to tidy up hashtables etc (Contexts) @@ -164,20 +163,20 @@ public enum NodeStatus { /** * hashtable with all {@link i5.las2peer.security.MessageReceiver}s registered at this node */ - private Hashtable htRegisteredReceivers = new Hashtable(); + private Hashtable htRegisteredReceivers = new Hashtable<>(); /** * map with all topics and their listeners */ - private HashMap> mapTopicListeners = new HashMap>(); + private HashMap> mapTopicListeners = new HashMap<>(); /** * other direction of {@link #mapTopicListeners} */ - private HashMap> mapListenerTopics = new HashMap>(); + private HashMap> mapListenerTopics = new HashMap<>(); private L2pClassManager baseClassLoader = null; - private Hashtable htAnswerListeners = new Hashtable(); + private Hashtable htAnswerListeners = new Hashtable<>(); private static final String DEFAULT_INFORMATION_FILE = "etc/nodeInfo.xml"; private String sInformationFileName = DEFAULT_INFORMATION_FILE; @@ -271,7 +270,7 @@ public PublicKey getPublicNodeKey() { * The event for this notification. conflicts if running multiple nodes on the same machine. */ private void initStandardLogfile() { - addObserver(L2pLogger.getInstance(Node.class.getName())); + addObserver(L2pLogger.getInstance(Node.class)); } /** @@ -298,8 +297,8 @@ public void removeObserver(NodeObserver observer) { * @param service The service that should be monitored. */ public void setServiceMonitoring(ServiceAgent service) { - observerNotice(Event.SERVICE_ADD_TO_MONITORING, this.getNodeId(), service.getId(), null, null, service - .getServiceNameVersion().toString()); + observerNotice(Event.SERVICE_ADD_TO_MONITORING, this.getNodeId(), service.getId(), null, null, + service.getServiceNameVersion().toString()); } /** @@ -570,7 +569,7 @@ public synchronized void shutDown() { break; } } - htRegisteredReceivers = new Hashtable(); + htRegisteredReceivers = new Hashtable<>(); } /** @@ -583,8 +582,8 @@ public synchronized void shutDown() { * @throws AgentException any problem with the agent itself (probably on calling * {@link i5.las2peer.security.Agent#notifyRegistrationTo} */ - public void registerReceiver(MessageReceiver receiver) throws AgentAlreadyRegisteredException, - L2pSecurityException, AgentException { + public void registerReceiver(MessageReceiver receiver) + throws AgentAlreadyRegisteredException, L2pSecurityException, AgentException { // TODO allow multiple mediators registered at the same time for one agent to avoid conflicts between connectors @@ -833,8 +832,8 @@ public abstract void sendMessage(Message message, Object atNodeId, MessageResult * @throws NodeNotFoundException * @throws L2pSecurityException */ - public void sendResponse(Message message, Object atNodeId) throws AgentNotKnownException, NodeNotFoundException, - L2pSecurityException { + public void sendResponse(Message message, Object atNodeId) + throws AgentNotKnownException, NodeNotFoundException, L2pSecurityException { sendMessage(message, atNodeId, null); } @@ -1059,7 +1058,7 @@ public Agent getLocalAgent(long id) throws AgentNotKnownException { * @return all local registered UserAgents */ public UserAgent[] getRegisteredAgents() { - Vector result = new Vector(); + Vector result = new Vector<>(); for (MessageReceiver rec : htRegisteredReceivers.values()) { if (rec instanceof UserAgent) { @@ -1076,7 +1075,7 @@ public UserAgent[] getRegisteredAgents() { * @return all local registered ServiceAgents */ public ServiceAgent[] getRegisteredServices() { - Vector result = new Vector(); + Vector result = new Vector<>(); for (MessageReceiver rec : htRegisteredReceivers.values()) { if (rec instanceof ServiceAgent) { @@ -1097,8 +1096,8 @@ public ServiceAgent[] getRegisteredServices() { * @throws L2pSecurityException * @throws AgentAlreadyRegisteredException */ - public Mediator createMediatorForAgent(Agent agent) throws AgentNotKnownException, L2pSecurityException, - AgentAlreadyRegisteredException { + public Mediator createMediatorForAgent(Agent agent) + throws AgentNotKnownException, L2pSecurityException, AgentAlreadyRegisteredException { if (agent.isLocked()) { throw new L2pSecurityException("You need to unlock the agent for mediation!"); } @@ -1123,8 +1122,8 @@ public Mediator createMediatorForAgent(Agent agent) throws AgentNotKnownExceptio * @throws L2pSecurityException * @throws AgentException */ - public abstract void storeAgent(Agent agent) throws AgentAlreadyRegisteredException, L2pSecurityException, - AgentException; + public abstract void storeAgent(Agent agent) + throws AgentAlreadyRegisteredException, L2pSecurityException, AgentException; /** * Updates an existing agent of the network. @@ -1278,8 +1277,8 @@ public Serializable invoke(Agent executing, ServiceNameVersion service, String m * @throws InterruptedException */ public Serializable invoke(Agent executing, ServiceNameVersion service, String method, Serializable[] parameters, - boolean exactVersion) throws L2pSecurityException, AgentNotKnownException, L2pServiceException, - InterruptedException { + boolean exactVersion) + throws L2pSecurityException, AgentNotKnownException, L2pServiceException, InterruptedException { return invoke(executing, service, method, parameters, exactVersion, false); } @@ -1300,8 +1299,8 @@ public Serializable invoke(Agent executing, ServiceNameVersion service, String m * @throws InterruptedException */ public Serializable invoke(Agent executing, ServiceNameVersion service, String method, Serializable[] parameters, - boolean exactVersion, boolean localOnly) throws L2pSecurityException, AgentNotKnownException, - L2pServiceException, InterruptedException { + boolean exactVersion, boolean localOnly) + throws L2pSecurityException, AgentNotKnownException, L2pServiceException, InterruptedException { if (getStatus() != NodeStatus.RUNNING) { throw new IllegalStateException("You can invoke methods only on a running node!"); @@ -1357,8 +1356,8 @@ public Serializable invoke(Agent executing, ServiceNameVersion service, String m * @throws L2pServiceException */ public Serializable invokeLocally(Agent executing, ServiceAgent serviceAgent, String method, - Serializable[] parameters) throws L2pSecurityException, AgentNotKnownException, InterruptedException, - L2pServiceException { + Serializable[] parameters) + throws L2pSecurityException, AgentNotKnownException, InterruptedException, L2pServiceException { if (getStatus() != NodeStatus.RUNNING) { throw new IllegalStateException("You can invoke methods only on a running node!"); @@ -1491,10 +1490,10 @@ public Serializable invokeGlobally(Agent executing, long serviceAgentId, Object return ((RMIResultContent) resultContent).getContent(); } else { // Do not log service class name (privacy..) - this.observerNotice(Event.RMI_FAILED, this.getNodeId(), executing, "Unknown RMI response type: " - + resultContent.getClass().getCanonicalName()); - throw new ServiceInvocationException("Unknown RMI response type: " - + resultContent.getClass().getCanonicalName()); + this.observerNotice(Event.RMI_FAILED, this.getNodeId(), executing, + "Unknown RMI response type: " + resultContent.getClass().getCanonicalName()); + throw new ServiceInvocationException( + "Unknown RMI response type: " + resultContent.getClass().getCanonicalName()); } } catch (AgentNotKnownException e) { // Do not log service class name (privacy..) @@ -1551,8 +1550,8 @@ public boolean handoverAnswer(Message answer) { return false; } - observerNotice(Event.MESSAGE_RECEIVED_ANSWER, answer.getSendingNodeId(), answer.getSenderId(), - this.getNodeId(), answer.getRecipientId(), "" + answer.getResponseToId()); + observerNotice(Event.MESSAGE_RECEIVED_ANSWER, answer.getSendingNodeId(), answer.getSenderId(), this.getNodeId(), + answer.getRecipientId(), "" + answer.getResponseToId()); MessageResultListener listener = htAnswerListeners.get(answer.getResponseToId()); if (listener == null) { @@ -1627,8 +1626,8 @@ public Message sendMessageAndWaitForAnswer(Message m, Object atNodeId) throws Ag * @throws InterruptedException * @throws TimeoutException */ - public Message[] sendMessageAndCollectAnswers(Message m, int recipientCount) throws InterruptedException, - TimeoutException { + public Message[] sendMessageAndCollectAnswers(Message m, int recipientCount) + throws InterruptedException, TimeoutException { long timeout = m.getTimeoutTs() - new Date().getTime(); MessageResultListener listener = new MessageResultListener(timeout, timeout / 4); listener.addRecipients(recipientCount); diff --git a/src/main/java/i5/las2peer/p2p/NodeServiceCache.java b/src/main/java/i5/las2peer/p2p/NodeServiceCache.java index d14e0c17c..f18d36436 100644 --- a/src/main/java/i5/las2peer/p2p/NodeServiceCache.java +++ b/src/main/java/i5/las2peer/p2p/NodeServiceCache.java @@ -1,8 +1,10 @@ package i5.las2peer.p2p; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.SortedMap; import java.util.SortedSet; @@ -396,6 +398,29 @@ public ServiceAgent getLocalService(ServiceNameVersion service) throws AgentNotK } } + public List getLocalServiceNames() { + ArrayList result = new ArrayList<>(); + synchronized (localServices) { + if (localServices != null) { + result = new ArrayList<>(localServices.keySet()); + } + } + return result; + } + + public List getLocalServiceVersions(String serviceName) { + ArrayList result = new ArrayList<>(); + synchronized (localServices) { + if (localServices != null) { + SortedMap versions = localServices.get(serviceName); + if (versions != null) { + result.addAll(versions.keySet()); + } + } + } + return result; + } + /** * represents an instance of a service agent * diff --git a/src/main/java/i5/las2peer/p2p/PastryNodeImpl.java b/src/main/java/i5/las2peer/p2p/PastryNodeImpl.java index fda0e3105..903da4cde 100644 --- a/src/main/java/i5/las2peer/p2p/PastryNodeImpl.java +++ b/src/main/java/i5/las2peer/p2p/PastryNodeImpl.java @@ -65,10 +65,20 @@ public class PastryNodeImpl extends Node { private static final L2pLogger logger = L2pLogger.getInstance(PastryNodeImpl.class); - private static final int AGENT_GET_TIMEOUT = 10000; - private static final int AGENT_STORE_TIMEOUT = 10000; - private static final int ARTIFACT_GET_TIMEOUT = 10000; - private static final int ARTIFACT_STORE_TIMEOUT = 10000; + /** + * The PAST_MESSAGE_TIMEOUT defines when a message in a Past (shared storage) operation is considered lost. This + * means all other timeouts depend on this value. + */ + private static final int PAST_MESSAGE_TIMEOUT = 60000; + // FIXME the timeouts should be PER STORAGE OPERATION and for the complete fetch or store process, as there might + // have to be send several messages for a single operation. Their value should be equal to PAST_MESSAGE_TIMEOUT plus + // a grace value of a few seconds. + private static final int AGENT_GET_TIMEOUT = 300000; + private static final int AGENT_STORE_TIMEOUT = 300000; + private static final int ARTIFACT_GET_TIMEOUT = 300000; + private static final int ARTIFACT_STORE_TIMEOUT = 300000; + private static final int HASHED_FETCH_TIMEOUT = 300000; + private static final int HASHED_STORE_TIMEOUT = 300000; private InetAddress pastryBindAddress = null; // null = detect Internet address @@ -84,7 +94,7 @@ public class PastryNodeImpl extends Node { private NodeApplication application; private SharedStorage pastStorage; private STORAGE_MODE mode = STORAGE_MODE.FILESYSTEM; - private String storageDir; // null = default choosen by SharedStorage + private String storageDir; // null = default chosen by SharedStorage private Long nodeIdSeed; /** @@ -109,7 +119,6 @@ public PastryNodeImpl(L2pClassManager classLoader, boolean useMonitoringObserver this.mode = storageMode; this.storageDir = null; // null = SharedStorage chooses directory this.nodeIdSeed = nodeIdSeed; - setupPastryEnvironment(); this.setStatus(NodeStatus.CONFIGURED); } @@ -150,75 +159,9 @@ public PastryNodeImpl(L2pClassManager classManager, Integer port, String bootstr this.mode = storageMode; this.storageDir = storageDir; this.nodeIdSeed = nodeIdSeed; - setupPastryEnvironment(); this.setStatus(NodeStatus.CONFIGURED); } - /** - * setup pastry environment settings - */ - private void setupPastryEnvironment() { - pastryEnvironment = new Environment(); - String[] configFiles = new String[] { "etc/pastry.properties", "config/pastry.properties", - "properties/pastry.properties" }; - String found = null; - for (String filename : configFiles) { - try { - if (new File(filename).exists()) { - found = filename; - break; - } - } catch (Exception e) { - logger.log(Level.FINER, "Exception while checking for config file '" + filename + "'", e); - } - } - Hashtable properties = new Hashtable<>(); - if (found != null) { - System.out.println("Using pastry property file " + found); - try { - Properties props = new Properties(); - props.load(new FileInputStream(found)); - - for (Object propname : props.keySet()) { - properties.put((String) propname, (String) props.get(propname)); - } - } catch (FileNotFoundException e) { - System.err.println("Unable to open property file " + found); - } catch (IOException e) { - System.err.println("Error opening property file " + found + ": " + e.getMessage()); - } - } else { - System.out.println("No pastry property file found - using default values"); - } - if (!properties.containsKey("nat_search_policy")) { - properties.put("nat_search_policy", "never"); - } - if (!properties.containsKey("firewall_test_policy")) { - properties.put("firewall_test_policy", "never"); - } - if (!properties.containsKey("nat_network_prefixes")) { - properties.put("nat_network_prefixes", "127.0.0.1;10.;192.168.;"); - } - if (pastryBindAddress != null && pastryBindAddress.isLoopbackAddress()) { - properties.put("allow_loopback_address", "1"); - } - if (!properties.containsKey("p2p_past_messageTimeout")) { - properties.put("p2p_past_messageTimeout", "120000"); - } - if (!properties.containsKey("pastry_socket_known_network_address")) { - if (!properties.containsKey("pastry_socket_known_network_address_port")) { - properties.put("pastry_socket_known_network_address_port", "80"); - } - } - if (!properties.containsKey("nat_search_policy")) { - properties.put("nat_search_policy", "never"); - } - for (String prop : properties.keySet()) { - pastryEnvironment.getParameters().setString(prop, properties.get(prop)); - logger.info("setting: " + prop + ": '" + properties.get(prop) + "'"); - } - } - /** * access to the underlying pastry node * @@ -290,10 +233,11 @@ private void setupPastryApplications() throws StorageException { */ @Override protected void launchSub() throws NodeException { - try { setStatus(NodeStatus.STARTING); + setupPastryEnvironment(); + NodeIdFactory nidFactory = null; if (nodeIdSeed == null) { nidFactory = new RandomNodeIdFactory(pastryEnvironment); @@ -357,6 +301,71 @@ public rice.pastry.Id generateNodeId() { } } + /** + * setup pastry environment settings + */ + private void setupPastryEnvironment() { + pastryEnvironment = new Environment(); + String[] configFiles = new String[] { "etc/pastry.properties", "config/pastry.properties", + "properties/pastry.properties" }; + String found = null; + for (String filename : configFiles) { + try { + if (new File(filename).exists()) { + found = filename; + break; + } + } catch (Exception e) { + logger.log(Level.FINER, "Exception while checking for config file '" + filename + "'", e); + } + } + Hashtable properties = new Hashtable<>(); + if (found != null) { + System.out.println("Using pastry property file " + found); + try { + Properties props = new Properties(); + props.load(new FileInputStream(found)); + + for (Object propname : props.keySet()) { + properties.put((String) propname, (String) props.get(propname)); + } + } catch (FileNotFoundException e) { + System.err.println("Unable to open property file " + found); + } catch (IOException e) { + System.err.println("Error opening property file " + found + ": " + e.getMessage()); + } + } else { + logger.fine("No pastry property file found - using default values"); + } + if (!properties.containsKey("nat_search_policy")) { + properties.put("nat_search_policy", "never"); + } + if (!properties.containsKey("firewall_test_policy")) { + properties.put("firewall_test_policy", "never"); + } + if (!properties.containsKey("nat_network_prefixes")) { + properties.put("nat_network_prefixes", "127.0.0.1;10.;192.168.;"); + } + if (pastryBindAddress != null && pastryBindAddress.isLoopbackAddress()) { + properties.put("allow_loopback_address", "1"); + } + if (!properties.containsKey("p2p_past_messageTimeout")) { + properties.put("p2p_past_messageTimeout", Integer.toString(PAST_MESSAGE_TIMEOUT)); + } + if (!properties.containsKey("pastry_socket_known_network_address")) { + if (!properties.containsKey("pastry_socket_known_network_address_port")) { + properties.put("pastry_socket_known_network_address_port", "80"); + } + } + if (!properties.containsKey("nat_search_policy")) { + properties.put("nat_search_policy", "never"); + } + for (String prop : properties.keySet()) { + pastryEnvironment.getParameters().setString(prop, properties.get(prop)); + logger.fine("setting: " + prop + ": '" + properties.get(prop) + "'"); + } + } + /** * register node shutdown as JVM shutdown method */ diff --git a/src/main/java/i5/las2peer/p2p/pastry/NodeApplication.java b/src/main/java/i5/las2peer/p2p/pastry/NodeApplication.java index 4e504b89a..bf6051fba 100644 --- a/src/main/java/i5/las2peer/p2p/pastry/NodeApplication.java +++ b/src/main/java/i5/las2peer/p2p/pastry/NodeApplication.java @@ -51,7 +51,7 @@ public class NodeApplication implements Application, ScribeMultiClient { public static final long SEARCH_TIMEOUT = 10000; // 10 seconds private static final int RESPONSE_WAIT_TIMEOUT = 10000; // 10 seconds - private final L2pLogger logger = L2pLogger.getInstance(NodeApplication.class.getName()); + private static final L2pLogger logger = L2pLogger.getInstance(NodeApplication.class.getName()); protected Endpoint endpoint; @@ -59,13 +59,13 @@ public class NodeApplication implements Application, ScribeMultiClient { private Scribe scribeClient; - private Hashtable htAgentTopics = new Hashtable(); + private Hashtable htAgentTopics = new Hashtable<>(); - private Hashtable htTopics = new Hashtable(); + private Hashtable htTopics = new Hashtable<>(); - private Hashtable> htPendingAgentSearches = new Hashtable>(); + private Hashtable> htPendingAgentSearches = new Hashtable<>(); - private Hashtable> appMessageWaiters = new Hashtable>(); + private Hashtable> appMessageWaiters = new Hashtable<>(); /** * create a pastry application for the given node @@ -88,8 +88,9 @@ public NodeApplication(PastryNodeImpl node) { */ public void registerAgentTopic(MessageReceiver receiver) { synchronized (htAgentTopics) { - if (htAgentTopics.get(receiver.getResponsibleForAgentId()) != null) + if (htAgentTopics.get(receiver.getResponsibleForAgentId()) != null) { return; + } Topic agentTopic = getAgentTopic(receiver); @@ -103,8 +104,8 @@ public void registerAgentTopic(MessageReceiver receiver) { scribeClient.subscribe(agentTopic, this, new AgentJoinedContent(getLocalHandle(), receiver.getResponsibleForAgentId()), root); - l2pNode.observerNotice(Event.PASTRY_TOPIC_SUBSCRIPTION_SUCCESS, this.l2pNode.getNodeId(), receiver, "" - + agentTopic.getId()); + l2pNode.observerNotice(Event.PASTRY_TOPIC_SUBSCRIPTION_SUCCESS, this.l2pNode.getNodeId(), receiver, + "" + agentTopic.getId()); /* System.out.println( "children of agent topic: " + scribeClient.numChildren(getAgentTopic(receiver)) ); for ( NodeHandle nh: scribeClient.getChildrenOfTopic(getAgentTopic ( receiver ))) @@ -124,8 +125,9 @@ public void unregisterAgentTopic(long id) throws AgentNotKnownException { Topic agentTopic = htAgentTopics.get(id); - if (agentTopic == null) + if (agentTopic == null) { throw new AgentNotKnownException("an agent with id " + id + " is not registered at this node"); + } scribeClient.unsubscribe(agentTopic, this); htAgentTopics.remove(id); @@ -136,8 +138,9 @@ public void unregisterAgentTopic(long id) throws AgentNotKnownException { public void registerTopic(long id) { synchronized (htTopics) { - if (htTopics.get(id) != null) + if (htTopics.get(id) != null) { return; + } Topic topic = getTopic(id); @@ -156,8 +159,9 @@ public void unregisterTopic(long id) throws NodeException { Topic topic = htTopics.get(id); - if (topic == null) + if (topic == null) { throw new NodeException("topic not found"); + } scribeClient.unsubscribe(topic, this); htTopics.remove(id); @@ -180,7 +184,7 @@ public NodeInformation getNodeInformation(NodeHandle nodeHandle) throws NodeNotF sendMessageDirectly(gim, nodeHandle); try { - WaiterThread waiter = new WaiterThread(RESPONSE_WAIT_TIMEOUT); + WaiterThread waiter = new WaiterThread<>(RESPONSE_WAIT_TIMEOUT); appMessageWaiters.put(messageId, waiter); waiter.start(); waiter.join(); @@ -188,8 +192,9 @@ public NodeInformation getNodeInformation(NodeHandle nodeHandle) throws NodeNotF if (waiter.hasResult()) { InfoResponseMessage irm = (InfoResponseMessage) waiter.getResult(); return irm.getInfoContent(); - } else + } else { throw new NodeNotFoundException("Timeout waiting for information answer"); + } } catch (InterruptedException e) { throw new NodeNotFoundException("Interrupted while waiting for answer"); } finally { @@ -234,13 +239,14 @@ public void run() { (Long) null, ""); // just store the sending node handle - HashSet pendingCollection = htPendingAgentSearches.get(((SearchAnswerMessage) pastMessage) - .getRequestMessageId()); + HashSet pendingCollection = htPendingAgentSearches + .get(((SearchAnswerMessage) pastMessage).getRequestMessageId()); - if (pendingCollection != null) + if (pendingCollection != null) { pendingCollection.add(((SearchAnswerMessage) pastMessage).getSendingNode()); - else + } else { logger.warning("got a timed out response or response to a message not sent by me!"); + } } else if (pastMessage instanceof GetInfoMessage) { // just send a response GetInfoMessage gim = (GetInfoMessage) pastMessage; @@ -256,10 +262,10 @@ public void run() { InfoResponseMessage irm = (InfoResponseMessage) pastMessage; WaiterThread waiter = appMessageWaiters.get(irm.getResponseToId()); - if (waiter == null) + if (waiter == null) { l2pNode.observerNotice(Event.MESSAGE_FAILED, l2pNode.getNodeId(), (MessageReceiver) null, "Got an answer to an information request I do not know from " + irm.getSender()); - else { + } else { l2pNode.observerNotice(Event.MESSAGE_RECEIVED, l2pNode.getNodeId(), (MessageReceiver) null, "Got an answer for Information request " + irm.getResponseToId() + "from " + irm.getSender()); waiter.collectResult(irm); @@ -317,8 +323,8 @@ public void routeMyMsgDirect(NodeHandle nh) { * @throws AgentNotKnownException * @throws MessageException */ - public void sendMessage(MessageEnvelope m, NodeHandle to) throws MalformedXMLException, L2pSecurityException, - AgentNotKnownException, MessageException { + public void sendMessage(MessageEnvelope m, NodeHandle to) + throws MalformedXMLException, L2pSecurityException, AgentNotKnownException, MessageException { l2pNode.observerNotice(Event.MESSAGE_SENDING, l2pNode.getPastryNode(), m.getContainedMessage().getSender(), to, m.getContainedMessage().getRecipient(), "message: " + m); @@ -390,14 +396,15 @@ public Collection searchAgent(long agentId, int expectedAnswers) { System.out.println("parent: " + scribeClient.getParent(agentTopic)); // System.out.println( "children: " + scribeClient.getChildren(agentTopic).length ); - for (NodeHandle nh : scribeClient.getChildrenOfTopic(getAgentTopic(agentId))) + for (NodeHandle nh : scribeClient.getChildrenOfTopic(getAgentTopic(agentId))) { System.out.println("Child in search: " + nh); + } - l2pNode.observerNotice(Event.AGENT_SEARCH_STARTED, this.l2pNode.getNodeId(), agentId, null, (Long) null, "(" - + expectedAnswers + ") - topic: " + getAgentTopic(agentId)); + l2pNode.observerNotice(Event.AGENT_SEARCH_STARTED, this.l2pNode.getNodeId(), agentId, null, (Long) null, + "(" + expectedAnswers + ") - topic: " + getAgentTopic(agentId)); SearchAgentContent search = new SearchAgentContent(getLocalHandle(), agentId); - HashSet resultSet = new HashSet(); + HashSet resultSet = new HashSet<>(); htPendingAgentSearches.put(search.getRandomId(), resultSet); // publish a message to search the agent registers @@ -407,18 +414,19 @@ public Collection searchAgent(long agentId, int expectedAnswers) { long timeout = new Date().getTime() + SEARCH_TIMEOUT; // todo: use a waiterThread here - while (new Date().getTime() <= timeout && resultSet.size() < expectedAnswers) + while (new Date().getTime() <= timeout && resultSet.size() < expectedAnswers) { try { System.out.println("\t\t waiting for agent-search (" + agentId + ") - " + (-new Date().getTime() + timeout) / 1000 + "s left"); Thread.sleep(SEARCH_SLEEP_TIME); } catch (InterruptedException e) { } + } htPendingAgentSearches.remove(search.getRandomId()); - l2pNode.observerNotice(Event.AGENT_SEARCH_FINISHED, this.l2pNode.getNodeId(), agentId, null, (Long) null, "" - + resultSet.size()); + l2pNode.observerNotice(Event.AGENT_SEARCH_FINISHED, this.l2pNode.getNodeId(), agentId, null, (Long) null, + "" + resultSet.size()); return resultSet; } @@ -452,10 +460,10 @@ public void deliver(Topic topic, ScribeContent content) { // found the agent // send message to searching node - endpoint.route( - null, - new SearchAnswerMessage(((SearchAgentContent) content).getOrigin(), this.l2pNode - .getPastryNode().getLocalNodeHandle(), ((SearchAgentContent) content).getRandomId()), + endpoint.route(null, + new SearchAnswerMessage(((SearchAgentContent) content).getOrigin(), + this.l2pNode.getPastryNode().getLocalNodeHandle(), + ((SearchAgentContent) content).getRandomId()), ((SearchAgentContent) content).getOrigin()); // send return message @@ -466,7 +474,8 @@ public void deliver(Topic topic, ScribeContent content) { logger.severe("\t\t<--- subscribed but agent not found!!!!"); } else if (content instanceof AgentJoinedContent) { - logger.info("\t\t<--- got notification about agent joining: " + ((AgentJoinedContent) content).getAgentId()); + logger.info( + "\t\t<--- got notification about agent joining: " + ((AgentJoinedContent) content).getAgentId()); } else if (content instanceof BroadcastMessageContent) { final BroadcastMessageContent c = (BroadcastMessageContent) content; @@ -505,8 +514,9 @@ public void subscribeFailed(Topic topic) { public void subscribeFailed(Collection topics) { // System.out.println(ColoredOutput.colorize( "topic subscription failed for collection of topics!", // ForegroundColor.Yellow)); - for (Topic t : topics) + for (Topic t : topics) { subscribeFailed(t); + } } @Override diff --git a/src/main/java/i5/las2peer/persistency/SharedStorage.java b/src/main/java/i5/las2peer/persistency/SharedStorage.java index 3e7c97a68..8fc8226fc 100644 --- a/src/main/java/i5/las2peer/persistency/SharedStorage.java +++ b/src/main/java/i5/las2peer/persistency/SharedStorage.java @@ -38,7 +38,6 @@ import i5.las2peer.persistency.pastry.PastInsertContinuation; import i5.las2peer.persistency.pastry.PastLookupContinuation; import i5.las2peer.security.Agent; -import i5.las2peer.security.L2pSecurityException; import i5.las2peer.tools.CryptoException; import i5.las2peer.tools.SerializationException; import i5.las2peer.tools.SerializeTools; @@ -123,6 +122,28 @@ public SharedStorage(Node node, STORAGE_MODE storageMode, ExecutorService thread versionCache = new ConcurrentHashMap<>(); } + private void lookupHandles(Id id, StorageLookupHandler lookupHandler, StorageExceptionHandler exceptionHandler) { + pastStorage.lookupHandles(id, numOfReplicas + 1, + new PastLookupContinuation(threadpool, lookupHandler, exceptionHandler)); + } + + private void waitForStoreResult(StoreProcessHelper resultHelper, long timeoutMs) throws StorageException { + long startWait = System.currentTimeMillis(); + while (System.currentTimeMillis() - startWait < timeoutMs) { + try { + if (resultHelper.getResult() >= 0) { + return; + } + Thread.sleep(1); + } catch (StorageException e) { + throw e; + } catch (Exception e) { + throw new StorageException(e); + } + } + throw new StorageException("store operation timed out"); + } + @Override public Envelope createEnvelope(String identifier, Serializable content, Agent... readers) throws IllegalArgumentException, SerializationException, CryptoException { @@ -172,26 +193,7 @@ public void storeEnvelope(Envelope envelope, Agent author, long timeoutMs) throw } StoreProcessHelper resultHelper = new StoreProcessHelper(); storeEnvelopeAsync(envelope, author, resultHelper, resultHelper, resultHelper); - long startWait = System.nanoTime(); - while (System.nanoTime() - startWait < timeoutMs * 1000000) { - try { - if (resultHelper.getResult() >= 0) { - return; - } - } catch (Exception e) { - if (e instanceof StorageException) { - throw (StorageException) e; - } else { - throw new StorageException(e); - } - } - try { - Thread.sleep(1); - } catch (InterruptedException e) { - throw new StorageException(e); - } - } - throw new StorageException("store operation timed out"); + waitForStoreResult(resultHelper, timeoutMs); } @Override @@ -211,22 +213,21 @@ private void storeEnvelopeAsync(Envelope envelope, Agent author, StorageStoreRes @Override public void run() { logger.info("Checking collision for " + envelope.toString()); - pastStorage.lookupHandles( - MetadataArtifact.buildMetadataId(artifactIdFactory, envelope.getIdentifier(), - envelope.getVersion()), - numOfReplicas + 1, new PastLookupContinuation(threadpool, new StorageLookupHandler() { - @Override - public void onLookup(ArrayList metadataHandles) { - if (metadataHandles.size() > 0) { - // collision detected - handleCollision(envelope, author, metadataHandles, resultHandler, collisionHandler, - exceptionHandler, mergeCounter); - } else { - // no collision -> try insert - insertEnvelope(envelope, author, resultHandler, exceptionHandler); - } - } - }, exceptionHandler)); + Id metadataId = MetadataArtifact.buildMetadataId(artifactIdFactory, envelope.getIdentifier(), + envelope.getVersion()); + lookupHandles(metadataId, new StorageLookupHandler() { + @Override + public void onLookup(ArrayList metadataHandles) { + if (metadataHandles.size() > 0) { + // collision detected + handleCollision(envelope, author, metadataHandles, resultHandler, collisionHandler, + exceptionHandler, mergeCounter); + } else { + // no collision -> try insert + insertEnvelope(envelope, author, resultHandler, exceptionHandler); + } + } + }, exceptionHandler); } }); } @@ -311,7 +312,7 @@ private void insertEnvelope(Envelope envelope, Agent author, StorageStoreResultH pastStorage.insert(toStore, new PastInsertContinuation(threadpool, multiResult, multiResult, toStore)); offset += partsize; } - } catch (CryptoException | L2pSecurityException e) { + } catch (Exception e) { if (exceptionHandler != null) { exceptionHandler.onException(e); } @@ -365,10 +366,11 @@ public void onResult(Serializable envelope, int successfulOperations) { } } }, exceptionHandler, metadataArtifact)); - } catch (CryptoException | L2pSecurityException | SerializationException e) { + } catch (Exception e) { if (exceptionHandler != null) { exceptionHandler.onException(e); } + return; } } @@ -385,23 +387,17 @@ public Envelope fetchEnvelope(String identifier, long version, long timeoutMs) } FetchProcessHelper resultHelper = new FetchProcessHelper(); fetchEnvelopeAsync(identifier, version, resultHelper, resultHelper); - long startWait = System.nanoTime(); - while (System.nanoTime() - startWait < timeoutMs * 1000000) { + long startWait = System.currentTimeMillis(); + while (System.currentTimeMillis() - startWait < timeoutMs) { try { Envelope result = resultHelper.getResult(); if (result != null) { return result; } - } catch (Exception e) { - if (e instanceof StorageException) { - throw (StorageException) e; - } else { - throw new StorageException(e); - } - } - try { Thread.sleep(1); - } catch (InterruptedException e) { + } catch (StorageException e) { + throw e; + } catch (Exception e) { throw new StorageException(e); } } @@ -455,26 +451,23 @@ public void onEnvelopeReceived(Envelope result) { }, artifactIdFactory, pastStorage, numOfReplicas + 1, threadpool)); } else { Id checkId = MetadataArtifact.buildMetadataId(artifactIdFactory, identifier, version); - pastStorage.lookupHandles(checkId, numOfReplicas + 1, - new PastLookupContinuation(threadpool, new StorageLookupHandler() { - - @Override - public void onLookup(ArrayList metadataHandles) { - if (metadataHandles.size() > 0) { - // call from first part - fetchWithMetadata(metadataHandles, envelopeHandler, exceptionHandler); - } else { - // not found - if (exceptionHandler != null) { - exceptionHandler - .onException(new ArtifactNotFoundException("Envelope with identifier '" - + identifier + "' and version " + version + " (" - + checkId.toStringFull() + ") not found in shared storage!")); - } - } + lookupHandles(checkId, new StorageLookupHandler() { + @Override + public void onLookup(ArrayList metadataHandles) { + if (metadataHandles.size() > 0) { + // call from first part + fetchWithMetadata(metadataHandles, envelopeHandler, exceptionHandler); + } else { + // not found + if (exceptionHandler != null) { + exceptionHandler.onException(new ArtifactNotFoundException( + "Envelope with identifier '" + identifier + "' and version " + version + " (" + + checkId.toStringFull() + ") not found in shared storage!")); } + } + } - }, exceptionHandler)); + }, exceptionHandler); } } @@ -525,21 +518,18 @@ private void fetchPart(String identifier, int part, long version, MultiArtifactH Id checkId = EnvelopeArtifact.buildId(artifactIdFactory, identifier, part); logger.info("Fetching part (" + part + ") of envelope '" + identifier + "' with id " + checkId.toStringFull() + " ..."); - pastStorage.lookupHandles(checkId, numOfReplicas + 1, - new PastLookupContinuation(threadpool, new StorageLookupHandler() { - @Override - public void onLookup(ArrayList handles) { - logger.info("Got " + handles.size() + " past handles for part (" + part + ") of '" + identifier - + "'"); - if (handles.size() < 1) { - artifactHandler.onException(new ArtifactNotFoundException( - "Part (" + part + ") of '" + identifier + "' with id (" + checkId.toStringFull() - + ") not found in shared storage!")); - } else { - fetchFromHandles(handles, artifactHandler, artifactHandler); - } - } - }, artifactHandler)); + lookupHandles(checkId, new StorageLookupHandler() { + @Override + public void onLookup(ArrayList handles) { + logger.info("Got " + handles.size() + " past handles for part (" + part + ") of '" + identifier + "'"); + if (handles.size() < 1) { + artifactHandler.onException(new ArtifactNotFoundException("Part (" + part + ") of '" + identifier + + "' with id (" + checkId.toStringFull() + ") not found in shared storage!")); + } else { + fetchFromHandles(handles, artifactHandler, artifactHandler); + } + } + }, artifactHandler); } private void fetchFromHandles(ArrayList handles, StorageArtifactHandler artifactHandler, @@ -610,8 +600,10 @@ private static Envelope buildFromParts(PastryIdFactory artifactIdFactory, Metada } try { byte[] rawContent = artifact.getContent(); - // add content to buffer - baos.write(rawContent); + if (rawContent != null) { + // add content to buffer + baos.write(rawContent); + } } catch (VerificationFailedException e) { throw new StorageException("Could not retrieve content from part", e); } diff --git a/src/main/java/i5/las2peer/security/L2pSecurityManager.java b/src/main/java/i5/las2peer/security/L2pSecurityManager.java index f948b2010..627589983 100644 --- a/src/main/java/i5/las2peer/security/L2pSecurityManager.java +++ b/src/main/java/i5/las2peer/security/L2pSecurityManager.java @@ -24,7 +24,7 @@ public class L2pSecurityManager extends SecurityManager { public L2pSecurityManager() { // check if local policy file exists, otherwise extract it from jar if (!new File("etc/las2peer.policy").exists()) { - logger.info("Policy file not found. Extracting default policy file from jar..."); + logger.info("Sandbox policy file not found. Extracting default policy file from jar..."); InputStream fromJar = this.getClass().getResourceAsStream("/las2peer.policy"); if (fromJar != null) { try { @@ -34,7 +34,7 @@ public L2pSecurityManager() { logger.printStackTrace(e); } } else { - logger.severe("Fatal Error! No local policy file and no file in jar! Sandboxing WILL NOT WORK!"); + throw new IllegalStateException("No local policy file and no file in jar! Sandboxing WILL NOT WORK!"); } } System.setProperty("java.security.policy", "etc/las2peer.policy"); diff --git a/src/main/java/i5/las2peer/tools/L2pNodeLauncher.java b/src/main/java/i5/las2peer/tools/L2pNodeLauncher.java index 025fb7c84..048e6dc8e 100644 --- a/src/main/java/i5/las2peer/tools/L2pNodeLauncher.java +++ b/src/main/java/i5/las2peer/tools/L2pNodeLauncher.java @@ -26,7 +26,6 @@ import i5.las2peer.api.exceptions.StorageException; import i5.las2peer.classLoaders.L2pClassManager; import i5.las2peer.classLoaders.libraries.FileSystemRepository; -import i5.las2peer.classLoaders.libraries.Repository; import i5.las2peer.communication.ListMethodsContent; import i5.las2peer.communication.Message; import i5.las2peer.execution.L2pServiceException; @@ -160,6 +159,9 @@ private Hashtable loadPassphrases(String filename) { content = FileContentReader.read(file).split("\n"); for (String line : content) { line = line.trim(); + if (line.isEmpty()) { + continue; + } String[] split = line.split(";", 2); if (split.length != 2) { printWarning("Ignoring invalid passphrase line (" + line + ") in '" + filename + "'"); @@ -942,15 +944,17 @@ public static L2pNodeLauncher launchSingle(int port, String bootstrap, STORAGE_M printWarning("couldn't use '" + sLogDir + "' as log directory." + ex); } } - Repository[] repositories = new Repository[0]; // default only network class loading - if (serviceDirectories != null) { - repositories = new Repository[] { new FileSystemRepository(serviceDirectories, true) }; + if (serviceDirectories == null) { + ArrayList directories = new ArrayList<>(); + directories.add(DEFAULT_SERVICE_DIRECTORY); + serviceDirectories = directories; } if (commands == null) { commands = new ArrayList<>(); } // instantiate launcher - L2pClassManager cl = new L2pClassManager(repositories, L2pNodeLauncher.class.getClassLoader()); + L2pClassManager cl = new L2pClassManager(new FileSystemRepository(serviceDirectories, true), + L2pNodeLauncher.class.getClassLoader()); L2pNodeLauncher launcher = new L2pNodeLauncher(port, bootstrap, storageMode, observer, cl, nodeIdSeed); // handle commands try { @@ -1048,8 +1052,10 @@ public static void printHelp(String message) { System.out.println("\t--colored-shell|-c enables colored output (better readable command line)\n"); System.out.println("\t--log-directory|-l [directory] lets you choose the directory for log files (default: " + L2pLogger.DEFAULT_LOG_DIRECTORY + ")\n"); - System.out.println( - "\t--service-directory|-s [directory] adds the directory you added your services to, to the class loader. This argument can occur multiple times.\n"); + System.out + .println("\t--service-directory|-s [directory] adds the directory you added your services to (default: " + + DEFAULT_SERVICE_DIRECTORY + + ") to the class loader. This argument can occur multiple times.\n"); System.out.println("\t--port|-p [port] specifies the port number of the node\n"); System.out.println("\tno bootstrap argument states, that a complete new las2peer network is to start"); System.out.println("\tor"); diff --git a/src/main/java/i5/las2peer/tools/SimpleTools.java b/src/main/java/i5/las2peer/tools/SimpleTools.java index e8757607a..146c2a453 100644 --- a/src/main/java/i5/las2peer/tools/SimpleTools.java +++ b/src/main/java/i5/las2peer/tools/SimpleTools.java @@ -1,5 +1,8 @@ package i5.las2peer.tools; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Random; @@ -96,14 +99,16 @@ public static String join(Iterable objects, String glue) { * */ public static String repeat(String string, int count) { - if (string == null) + if (string == null) { return null; - else if (string.isEmpty() || count <= 0) + } else if (string.isEmpty() || count <= 0) { return ""; + } StringBuffer result = new StringBuffer(); - for (int i = 0; i < count; i++) + for (int i = 0; i < count; i++) { result.append(string); + } return result.toString(); } @@ -119,4 +124,15 @@ public static String repeat(Object o, int count) { return repeat(o.toString(), count); } + public static byte[] toByteArray(InputStream is) throws IOException { + ByteArrayOutputStream data = new ByteArrayOutputStream(); + int nRead; + byte[] buffer = new byte[8096]; + while ((nRead = is.read(buffer, 0, buffer.length)) != -1) { + data.write(buffer, 0, nRead); + } + data.flush(); + return data.toByteArray(); + } + } diff --git a/src/test/java/i5/las2peer/p2p/PastryNodeImplTest.java b/src/test/java/i5/las2peer/p2p/PastryNodeImplTest.java index 01cc0e58d..89dcc87d9 100755 --- a/src/test/java/i5/las2peer/p2p/PastryNodeImplTest.java +++ b/src/test/java/i5/las2peer/p2p/PastryNodeImplTest.java @@ -20,4 +20,17 @@ public void testSystemDefinedPort() { } } + @Test + public void testNodeRestart() { + try { + PastryNodeImpl testNode = TestSuite.launchNetwork(1).get(0); + testNode.shutDown(); + testNode.launch(); + testNode.shutDown(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + } diff --git a/src/test/java/i5/las2peer/persistency/MissingNodesTest.java b/src/test/java/i5/las2peer/persistency/MissingNodesTest.java new file mode 100644 index 000000000..8591c632b --- /dev/null +++ b/src/test/java/i5/las2peer/persistency/MissingNodesTest.java @@ -0,0 +1,88 @@ +package i5.las2peer.persistency; + +import java.util.ArrayList; +import java.util.Random; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import i5.las2peer.p2p.PastryNodeImpl; +import i5.las2peer.security.UserAgent; +import i5.las2peer.testing.MockAgentFactory; +import i5.las2peer.testing.TestSuite; + +public class MissingNodesTest { + + private static final Random r = new Random(); + private ArrayList nodes; + + @Before + public void startNetwork() { + try { + // start test node + nodes = TestSuite.launchNetwork(SharedStorage.DEFAULT_NUM_OF_REPLICAS + 1); + System.out.println("Test network with " + nodes.size() + " nodes started"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @After + public void stopNetwork() { + if (nodes != null) { + for (PastryNodeImpl node : nodes) { + node.shutDown(); + } + nodes = null; + } + } + + @Test + public void testStoreWithMissingNode() { + try { + // remove a node from network + nodes.remove(r.nextInt(nodes.size())).shutDown(); + // store test content + PastryNodeImpl active = nodes.get(r.nextInt(nodes.size())); + Envelope env = active.createUnencryptedEnvelope("test", "This is las2peer!"); + UserAgent smith = MockAgentFactory.getAdam(); + smith.unlockPrivateKey("adamspass"); + long start = System.currentTimeMillis(); + active.storeEnvelope(env, smith); + long delay = System.currentTimeMillis() - start; + System.out.println("Storing envelope with missing node took " + delay + "ms"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + + @Test + public void testFetchWithMissingNode() { + try { + // store test content + PastryNodeImpl active = nodes.get(r.nextInt(nodes.size())); + Envelope env = active.createUnencryptedEnvelope("test", "This is las2peer!"); + UserAgent smith = MockAgentFactory.getAdam(); + smith.unlockPrivateKey("adamspass"); + active.storeEnvelope(env, smith); + // shutdown a random node + nodes.remove(r.nextInt(nodes.size())).shutDown(); + // fetch content with random node + PastryNodeImpl fetching = nodes.get(r.nextInt(nodes.size())); + long start = System.currentTimeMillis(); + Envelope fetched = fetching.fetchEnvelope("test"); + long delay = System.currentTimeMillis() - start; + // verify content + Assert.assertEquals(env.getContent(), fetched.getContent()); + System.out.println("Fetching envelope with missing node took " + delay + "ms"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + +} diff --git a/src/test/java/i5/las2peer/persistency/PersistenceTest.java b/src/test/java/i5/las2peer/persistency/PersistenceTest.java index 6c49f353d..2017cbe42 100644 --- a/src/test/java/i5/las2peer/persistency/PersistenceTest.java +++ b/src/test/java/i5/las2peer/persistency/PersistenceTest.java @@ -5,9 +5,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; +import org.junit.Ignore; import org.junit.Test; -import org.junit.rules.TestName; import i5.las2peer.p2p.PastryNodeImpl; import i5.las2peer.persistency.SharedStorage.STORAGE_MODE; @@ -19,9 +18,6 @@ public class PersistenceTest { private ArrayList nodes; - @Rule - public TestName name = new TestName(); - @Before public void startNetwork() { try { @@ -44,11 +40,6 @@ public void stopNetwork() { } } - @Test - public void testStartStopNetwork() { - // just as time reference ... - } - @Test public void testFilesystemPersistence() { try { @@ -73,33 +64,48 @@ public void testFilesystemPersistence() { } } - // TODO test currently not working because of LAS-359, time to join the network is too high -// @Test -// public void testVersioning() { -// try { -// // start network and write data into shared storage -// PastryNodeImpl node1 = nodes.get(0); -// Envelope env = node1.createUnencryptedEnvelope("test", "This is las2peer!"); -// UserAgent smith = MockAgentFactory.getAdam(); -// smith.unlockPrivateKey("adamspass"); -// node1.storeEnvelope(env, smith); -// // shutdown node 2 -// PastryNodeImpl node2 = nodes.remove(1); -// node2.shutDown(); -// Thread.sleep(500); -// // update envelope -// env = node1.createUnencryptedEnvelope(env, "This is las2peer again!"); -// node1.storeEnvelope(env, smith); -// // start node 2 again (still with version 1 of env in storage -// node2 = TestSuite.addNode(node1.getPort(), STORAGE_MODE.FILESYSTEM, 1L); -// nodes.set(1, node2); -// // read data -// Envelope fetched = node2.fetchEnvelope("test"); -// Assert.assertEquals(env.getContent(), fetched.getContent()); -// } catch (Exception e) { -// e.printStackTrace(); -// Assert.fail(e.toString()); -// } -// } + @Ignore + @Test + public void testVersionSafety() { + try { + // start network and write data into shared storage + PastryNodeImpl node1 = nodes.get(0); + Envelope env = node1.createUnencryptedEnvelope("test", "This is las2peer!"); + UserAgent smith = MockAgentFactory.getAdam(); + smith.unlockPrivateKey("adamspass"); + node1.storeEnvelope(env, smith); + // shutdown node 2 + PastryNodeImpl node2 = nodes.remove(1); + node2.shutDown(); + // update envelope + Envelope updated = node1.createUnencryptedEnvelope(env, "This is las2peer again!"); + long start = System.currentTimeMillis(); + node1.storeEnvelope(updated, smith); + long stop = System.currentTimeMillis(); + System.out.println(stop - start); + // start node 2 again (still with version 1 of env in storage + node2 = TestSuite.addNode(node1.getPort(), STORAGE_MODE.FILESYSTEM, 1L); + nodes.set(1, node2); + // read data + Envelope fetched = node2.fetchEnvelope("test"); + Assert.assertEquals(updated.getContent(), fetched.getContent()); + /* + * The test output should also end with the following lines: + * + * Looking for metadata envelope with identifier 'test' and version 1 at id FDC232A97E1E3E66C6023E1306DC2C1CA9EFF2F9 ... + Lookup got 6 past handles for identifier 'test' and version 1 + Looking for metadata envelope with identifier 'test' and version 2 at id 1B225008E192E98014E4F24EC6655D160F0AFB86 ... + Lookup got 5 past handles for identifier 'test' and version 2 + Looking for metadata envelope with identifier 'test' and version 3 at id B7B3972C41D6E0E480F4292DA52AD6140B51BFD2 ... + Lookup got 0 past handles for identifier 'test' and version 3 + * + * Version 1 should have one more handle than version 2 and version 3 has zero handles. + * + */ + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } }