diff --git a/bin/backend-start.sh b/bin/backend-start.sh index e62f9add9..9ff569687 100755 --- a/bin/backend-start.sh +++ b/bin/backend-start.sh @@ -8,5 +8,5 @@ source ${mydir}/lib.sh # If you want GRPC logging uncomment the following line # LOGGING="-Djava.util.logging.config.file=logging.properties" -cd ${mydir}/../platform/ -java ${LOGGING} -server -jar target/hillview-server-jar-with-dependencies.jar 127.0.0.1:3569 +cd ${mydir} +java ${LOGGING} -server -jar ../platform/target/hillview-server-jar-with-dependencies.jar 127.0.0.1:3569 diff --git a/bin/delete-data.py b/bin/delete-data.py index 442affa65..9baf74097 100755 --- a/bin/delete-data.py +++ b/bin/delete-data.py @@ -22,7 +22,7 @@ def delete_folder(config, folder): assert isinstance(config, ClusterConfiguration) message = "Deleting " + folder + " from all hosts" logger.info(message) - config.run_on_all_workers(lambda rh: delete_remote_folder(rh, folder), True) + config.run_on_all_workers(lambda rh: delete_remote_folder(rh, folder)) def main(): """Main function""" diff --git a/bin/deploy.py b/bin/deploy.py index f0a7e51f8..429ccfc72 100755 --- a/bin/deploy.py +++ b/bin/deploy.py @@ -12,6 +12,25 @@ logger = get_logger("deploy") +def generate_script(config, rh, template): + """Generates a shell script based on a template inserting configuration variables""" + logger.info("Generating script for host " + rh.host + " from " + template) + variables = "" + variables += "SERVICE_DIRECTORY=" + config.service_folder + "\n" + variables += "HEAPSIZE=\"" + rh.heapsize + "\"\n" + variables += "USER=" + rh.user + "\n" + variables += "WORKER_PORT=" + str(config.worker_port) + "\n" + variables += "AGGREGATOR_PORT=" + str(config.aggregator_port) + "\n" + variables += "CLEANUP=" + str(1 if config.cleanup_on_install() else 0) + "\n" + variables += "TOMCAT=" + config.tomcat + "\n" + lines = list(open(template)) + filename = template.replace("-template", "") + lines = [variables if "REPLACE_WITH_VARIABLES" in x else x for x in lines] + with open(filename, "w") as f: + for l in lines: + f.write(l) + os.chmod(filename, 0o770) + def prepare_webserver(config): """Deploys files needed by the Hillview web server""" logger.info("Creating web service folder") @@ -21,7 +40,6 @@ def prepare_webserver(config): logger.info(message) rh.create_remote_folder(config.service_folder) rh.run_remote_shell_command("chown " + config.get_user() + " " + config.service_folder) - rh.create_remote_folder(config.service_folder + "/hillview") major = config.tomcat_version[0:config.tomcat_version.find('.')] @@ -50,13 +68,10 @@ def prepare_webserver(config): tmp.close() rh.copy_file_to_remote(tmp.name, config.service_folder + "/serverlist", "") os.unlink(tmp.name) - if config.cleanup_on_install(): - rh.run_remote_shell_command( - "cd " + config.service_folder + ";" + \ - "rm -f hillview-web.log hillview-web.log.* hillview-web.log*.lck") - # link to web server logs - rh.run_remote_shell_command("ln -sf " + config.service_folder + "/hillview-web.log " + \ - config.service_folder + "/hillview/hillview-web.log") + generate_script(config, rh, "hillview-webserver-manager-template.sh") + rh.copy_file_to_remote( + "hillview-webserver-manager.sh", config.service_folder, "") + os.unlink("hillview-webserver-manager.sh") def prepare_worker(config, rh): """Prepares files needed by a Hillview worker on a remote machine""" @@ -64,17 +79,19 @@ def prepare_worker(config, rh): message = "Preparing worker " + str(rh) logger.info(message) # rh.run_remote_shell_command("sudo apt-get install libgfortran3") + rh.create_remote_folder(config.service_folder) rh.run_remote_shell_command("chown " + config.get_user() + " " + config.service_folder) - rh.create_remote_folder(config.service_folder + "/hillview") + rh.create_remote_folder(config.service_folder) rh.copy_file_to_remote( config.scriptFolder + "/../platform/target/hillview-server-jar-with-dependencies.jar", - config.service_folder + "/hillview", "") - if config.cleanup_on_install(): - rh.run_remote_shell_command( - "cd " + config.service_folder + "/hillview;" - "rm -f hillview.log hillview.log.* hillview.log*.lck") + config.service_folder, "") + generate_script(config, rh, "hillview-worker-manager-template.sh") + rh.copy_file_to_remote( + "hillview-worker-manager.sh", config.service_folder, "") + rh.copy_file_to_remote("forever.sh", config.service_folder, "") + os.unlink("hillview-worker-manager.sh") def prepare_aggregator(config, rh): """Prepares files needed by a Hillview aggregator on a remote machine""" @@ -83,23 +100,22 @@ def prepare_aggregator(config, rh): logger.info(message) rh.create_remote_folder(config.service_folder) rh.run_remote_shell_command("chown " + config.get_user() + " " + config.service_folder) - rh.create_remote_folder(config.service_folder + "/hillview") + rh.create_remote_folder(config.service_folder) rh.copy_file_to_remote( - config.scriptFolder + - "/../platform/target/hillview-server-jar-with-dependencies.jar", - config.service_folder + "/hillview", "") - if config.cleanup_on_install(): - rh.run_remote_shell_command( - "cd " + config.service_folder + "/hillview;" - "rm -f hillview-agg.log hillview-agg.log.* hillview-agg.log*.lck") + config.scriptFolder + "/../platform/target/hillview-server-jar-with-dependencies.jar", + config.service_folder, "") + rh.copy_file_to_remote("forever.sh", config.service_folder, "") tmp = tempfile.NamedTemporaryFile(mode="w", delete=False) for h in rh.children: tmp.write(h + ":" + str(config.worker_port) + "\n") tmp.close() rh.copy_file_to_remote(tmp.name, config.service_folder + "/workers", "") os.unlink(tmp.name) - rh.run_remote_shell_command("ln -sf " + config.service_folder + "/hillview-agg.log " + \ - config.service_folder + "/hillview/hillview-agg.log") + + generate_script(config, rh, "hillview-aggregator-manager-template.sh") + rh.copy_file_to_remote( + "hillview-aggregator-manager.sh", config.service_folder, "") + os.unlink("hillview-aggregator-manager.sh") def prepare_workers(config): """Prepares all Hillview workers""" diff --git a/bin/forever.sh b/bin/forever.sh new file mode 100755 index 000000000..d68205768 --- /dev/null +++ b/bin/forever.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# Runs another process in a loop + +if [ $# -le 1 ]; then + echo "Usage: forever.sh pidfile command" + echo "Re-runs command every time it terminates" + echo "Writes its own pid in pidfile" + exit 1 +fi + +pidfile=$1 +shift + +echo "Running $1 forever" +echo $$ >${pidfile} +while /bin/true; do + $* + sleep 2 + echo "Restarting..." +done diff --git a/bin/frontend-start.sh b/bin/frontend-start.sh index 497ad94e3..73f0faf3e 100755 --- a/bin/frontend-start.sh +++ b/bin/frontend-start.sh @@ -13,5 +13,5 @@ export JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address= # LOGGING=" -Djava.util.logging.config.file=logging.properties" export JAVA_OPTS="$JAVA_OPTS$LOGGING" -cd ${mydir}/../web # logs will always be in this folder +cd ${mydir} ../apache-tomcat-${TOMCATVERSION}/bin/catalina.sh run diff --git a/bin/hillview-aggregator-manager-template.sh b/bin/hillview-aggregator-manager-template.sh new file mode 100644 index 000000000..c0f8c1dec --- /dev/null +++ b/bin/hillview-aggregator-manager-template.sh @@ -0,0 +1,66 @@ +#!/bin/bash +# This is teamplate for a script that manages a hillview aggregator as a Unix service. + +#REPLACE_WITH_VARIABLES + +FOREVERPID="forever-aggregator.pid" +PIDFILE="hillview-aggregator.pid" + +cd ${SERVICE_DIRECTORY} + +start() { + if [ "x${CLEANUP}" == "x1" ]; then + rm -f hillview-agg.log hillview-agg.log.* hillview-agg.log*.lck + fi + ./forever.sh ${FOREVERPID} nohup java -ea -Dlog4j.configurationFile=./log4j.properties -server -Xmx${HEAPSIZE} -jar ${SERVICE_DIRECTORY}/hillview-server-jar-with-dependencies.jar ${SERVICE_DIRECTORY}/workers 0.0.0.0:${AGGREGATOR_PORT} >nohup.out 2>&1 & +} + +killbypid() { + local PIDFILE=$1 + if [ -f ${PIDFILE} ]; then + # First try to find the service pid + read LINE < ${SERVICE_DIRECTORY}/${PIDFILE} + if [ -d "/proc/${LINE}" ]; then + echo "Killing $2 process ${LINE}" + kill ${LINE} + fi + rm -f ${SERVICE_DIRECTORY}/${PIDFILE} + return 0 + else + return 1 + fi +} + +stop() { + killbypid ${FOREVERPID} forever + killbypid ${PIDFILE} hillview-server +} + +status() { + if [ -f ${PIDFILE} ]; then + read LINE < ${SERVICE_DIRECTORY}/${PIDFILE} + if [ -d "/proc/${LINE}" ]; then + echo "Process seems to be running" + return 0 + fi + fi + echo "Could not find running aggregator" +} + +case ${1} in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + status) + status + ;; + *) + echo "Usage: $0 {start|stop|restart|status}" +esac diff --git a/bin/hillview-webserver-manager-template.sh b/bin/hillview-webserver-manager-template.sh new file mode 100644 index 000000000..d5d1f432d --- /dev/null +++ b/bin/hillview-webserver-manager-template.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# This is teamplate for a script that manages a hillview web server as a service. + +#REPLACE_WITH_VARIABLES + +# File storing PID of the hillview worker +PIDFILE="hillview-webserevr.pid" +cd ${SERVICE_DIRECTORY} + +start() { + if [ "x${CLEANUP}" == "x1" ]; then + rm -f hillview-web.log hillview-web.log.* hillview-web.log*.lck + fi + export WEB_CLUSTER_DESCRIPTOR=serverlist + nohup ./${TOMCAT}/bin/startup.sh & +} + +stop() { + if pgrep -f tomcat; then + ${SERVICE_DIRECTORY}"/"${TOMCAT}/bin/shutdown.sh + echo Stopped + else + echo "Web server already stopped" + fi + pkill -f Bootstrap + true +} + +status() { + # This assumes there's a single tomcat on the machine, which may not be true... + if ! pgrep -f tomcat; then + echo "Web server not running" + else + echo "Web server running" + fi + true +} + +case ${1} in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + status) + status + ;; + *) + echo "Usage: $0 {start|stop|restart|status}" +esac diff --git a/bin/hillview-worker-manager-template.sh b/bin/hillview-worker-manager-template.sh new file mode 100644 index 000000000..c4279727c --- /dev/null +++ b/bin/hillview-worker-manager-template.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# This is teamplate for a script that manages a hillview worker as a service. + +#REPLACE_WITH_VARIABLES + +# File storing PID of the hillview worker +PIDFILE="hillview-worker.pid" +# File storing PID of the forever process that runs the hillview worker +FOREVERPID="forever-worker.pid" + +cd ${SERVICE_DIRECTORY} + +start() { + echo "Starting worker" + if [ "x${CLEANUP}" == "x1" ]; then + rm -f hillview.log hillview.log.* hillview.log*.lck + fi + ./forever.sh ${FOREVERPID} nohup java -ea -Dlog4j.configurationFile=./log4j.properties -server -Xmx${HEAPSIZE} -jar ${SERVICE_DIRECTORY}/hillview-server-jar-with-dependencies.jar 0.0.0.0:${WORKER_PORT} >nohup.out 2>&1 & +} + +killbypid() { + local PIDFILE=$1 + if [ -f ${PIDFILE} ]; then + # First try to find the service pid + read LINE < ${SERVICE_DIRECTORY}/${PIDFILE} + if [ -d "/proc/${LINE}" ]; then + echo "Killing $2 process ${LINE}" + kill ${LINE} + fi + rm -f ${SERVICE_DIRECTORY}/${PIDFILE} + return 0 + else + return 1 + fi +} + +stop() { + killbypid ${FOREVERPID} forever + killbypid ${PIDFILE} hillview-server + if [ $? -ne 0 ]; then + # Kill it by name; may have collateral damage + if pgrep -f hillview-server; then + pkill -f hillview-server + echo "Stopped" + else + echo "Already stopped" + fi + fi + true +} + +status() { + if [ -f ${PIDFILE} ]; then + read LINE < ${PIDFILE} + if [ -d "/proc/${LINE}" ]; then + echo "Process seems to be running" + return 0 + fi + fi + echo "Could not find running worker" +} + +case ${1} in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + status) + status + ;; + *) + echo "Usage: $0 {start|stop|restart|status}" +esac diff --git a/bin/hillviewCommon.py b/bin/hillviewCommon.py index d441fd26c..9be51b33e 100644 --- a/bin/hillviewCommon.py +++ b/bin/hillviewCommon.py @@ -30,6 +30,7 @@ def __init__(self, user, host, parent, heapsize="200M"): self.user = user self.parent = parent self.heapsize = heapsize + self.isAggregator = False def uh(self): """user@host""" @@ -83,6 +84,7 @@ def __init__(self, user, host, parent, children): "Create a remote aggregator""" super().__init__(user, host, parent) self.children = children + self.isAggregator = True class JsonConfig: """ Configuration read from a JSON file.""" @@ -113,6 +115,12 @@ class ClusterConfiguration: def __init__(self, file): """Load the configuration file describing the Hillview deployment.""" message = "Reading cluster configuration from " + file + # Some default values, in case they are missing + self.worker_port = 3569 + self.user = "hillview" + self.cleanup = False + self.service_folder = "/home/hillview" + self.default_heap_size = "5G" logger.info(message) if not os.path.exists(file): error = "Configuration file '" + file + "' does not exist" @@ -139,6 +147,8 @@ def __init__(self, file): self.tomcat_version = self.jsonConfig.tomcat_version if hasattr(self.jsonConfig, "aggregator_port"): self.aggregator_port = self.jsonConfig.aggregator_port + else: + self.aggregator_port = 0 def get_user(self): """Returns the user used by the hillview service""" @@ -146,7 +156,7 @@ def get_user(self): def _get_heap_size(self, hostname): """The heap size used for the specified host""" - if hostname in self.jsonConfig.workers_heapsize: + if hasattr(self.jsonConfig, "workers_heapsize") and hostname in self.jsonConfig.workers_heapsize: return self.jsonConfig.workers_heapsize[hostname] return self.jsonConfig.default_heap_size @@ -178,21 +188,17 @@ def cleanup_on_install(self): """Returns true if we need to cleaup when installing""" return self.jsonConfig.cleanup - def run_on_all_aggregators(self, function, parallel=True): + def run_on_all_aggregators(self, function): # pylint: disable=unused-argument """Run a lambda on all aggregators. function is a lambda that takes a - RemoteHost object as an argument. If parallel is True the function is - run concurrently.""" + RemoteHost object as an argument.""" for rh in self.get_aggregators(): function(rh) - def run_on_all_workers(self, function, parallel=True): + def run_on_all_workers(self, function): # pylint: disable=unused-argument """Run a lambda on all workers. function is a lambda that takes a - RemoteHost object as an argument. If parallel is True the function is - run concurrently.""" - # Unfortunately there seems to be no way to reliably - # run something in parallel in Python, so this is not working yet. + RemoteHost object as an argument.""" for rh in self.get_workers(): function(rh) diff --git a/bin/run-on-all.py b/bin/run-on-all.py index 27d33818a..2f81f7937 100755 --- a/bin/run-on-all.py +++ b/bin/run-on-all.py @@ -10,13 +10,13 @@ logger = get_logger("run-on-all") -def execute_command_on_all(config, command, parallel): +def execute_command_on_all(config, command): """Executes command on all workers""" assert isinstance(config, ClusterConfiguration) message = "Executing `" + str(command) + "' on " + str(len(config.get_workers())) + " hosts" logger.info(message) lam = lambda rh: rh.run_remote_shell_command(command) - config.run_on_all_workers(lam, parallel) + config.run_on_all_workers(lam) def main(): """Main function""" @@ -26,7 +26,7 @@ def main(): args = parser.parse_args() config = get_config(parser, args) command = " ".join(args.command) - execute_command_on_all(config, command, False) + execute_command_on_all(config, command) if __name__ == "__main__": main() diff --git a/bin/start.py b/bin/start.py index e58ca8e37..015692b01 100755 --- a/bin/start.py +++ b/bin/start.py @@ -16,9 +16,7 @@ def start_webserver(config): rh = config.get_webserver() message = "Starting web server " + str(rh) logger.info(message) - rh.run_remote_shell_command( - "export WEB_CLUSTER_DESCRIPTOR=serverlist; cd " + config.service_folder + "; nohup " + \ - config.tomcat + "/bin/startup.sh &") + rh.run_remote_shell_command(config.service_folder + "/hillview-webserver-manager.sh start") def start_worker(config, rh): """Starts the Hillview worker on a remote machine""" @@ -26,14 +24,7 @@ def start_worker(config, rh): assert isinstance(config, ClusterConfiguration) message = "Starting worker " + str(rh) logger.info(message) - gclog = config.service_folder + "/hillview/gc.log" - rh.run_remote_shell_command( - "cd " + config.service_folder + "/hillview; " + \ - "nohup java -ea -Dlog4j.configurationFile=./log4j.properties -server " + \ - " -Xmx" + rh.heapsize + " -Xloggc:" + gclog + \ - " -jar " + config.service_folder + \ - "/hillview/hillview-server-jar-with-dependencies.jar " + "0.0.0.0:" + \ - str(config.worker_port) + " >nohup.out 2>&1 &") + rh.run_remote_shell_command(config.service_folder + "/hillview-worker-manager.sh start") def start_aggregator(config, agg): """Starts a Hillview aggregator""" @@ -41,23 +32,7 @@ def start_aggregator(config, agg): assert isinstance(config, ClusterConfiguration) message = "Starting aggregator " + str(agg) logger.info(message) - agg.run_remote_shell_command( - "cd " + config.service_folder + "/hillview; " + \ - "nohup java -ea -Dlog4j.configurationFile=./log4j.properties -server " + \ - " -jar " + config.service_folder + \ - "/hillview/hillview-server-jar-with-dependencies.jar " + \ - config.service_folder + "/workers " + agg.host + ":" + \ - str(config.aggregator_port) + " >nohup.agg 2>&1 &") - -def start_aggregators(config): - """Starts all Hillview aggregators""" - assert isinstance(config, ClusterConfiguration) - config.run_on_all_aggregators(lambda rh: start_aggregator(config, rh)) - -def start_workers(config): - """Starts all Hillview workers""" - assert isinstance(config, ClusterConfiguration) - config.run_on_all_workers(lambda rh: start_worker(config, rh)) + rh.run_remote_shell_command(config.service_folder + "/hillview-aggregator-manager.sh start") def main(): """Main function""" @@ -66,8 +41,8 @@ def main(): args = parser.parse_args() config = get_config(parser, args) start_webserver(config) - start_workers(config) - start_aggregators(config) + config.run_on_all_aggregators(lambda rh: start_aggregator(config, rh)) + config.run_on_all_workers(lambda rh: start_worker(config, rh)) if __name__ == "__main__": main() diff --git a/bin/status.py b/bin/status.py index 7b85aca84..7665ff503 100755 --- a/bin/status.py +++ b/bin/status.py @@ -16,9 +16,7 @@ def check_webserver(config): rh = config.get_webserver() message = "Checking hillview status on " + str(rh) logger.info(message) - rh.run_remote_shell_command("if pgrep -f tomcat; then true; else " + - " echo \"Web server not running on " + str(rh.host) +"\"; " + - " false; fi") + rh.run_remote_shell_command(config.service_folder + "/hillview-webserver-manager.sh status") def check_worker(config, rh): """Checks if the Hillview service is running on a remote machine""" @@ -26,15 +24,15 @@ def check_worker(config, rh): assert isinstance(rh, RemoteHost) message = "Checking hillview status on " + str(rh.host) logger.info(message) - rh.run_remote_shell_command("if pgrep -f hillview-server; then true; else " + - " echo \"Hillview not running on " + str(rh.host) +"\"; " + - " cat " + config.service_folder + "/hillview/nohup.out; false; fi") + rh.run_remote_shell_command(config.service_folder + "/hillview-worker-manager.sh status") -def check_workers(config): - """Checks all Hillview workers and aggregators""" +def check_aggregator(config, rh): + """Checks if the Hillview service is running on a remote machine""" assert isinstance(config, ClusterConfiguration) - config.run_on_all_aggregators(lambda rh: check_worker(config, rh)) - config.run_on_all_workers(lambda rh: check_worker(config, rh)) + assert isinstance(rh, RemoteHost) + message = "Checking hillview status on " + str(rh.host) + logger.info(message) + rh.run_remote_shell_command(config.service_folder + "/hillview-aggregator-manager.sh status") def main(): """Main function""" @@ -43,7 +41,8 @@ def main(): args = parser.parse_args() config = get_config(parser, args) check_webserver(config) - check_workers(config) + config.run_on_all_aggregators(lambda rh: check_aggregator(config, rh)) + config.run_on_all_workers(lambda rh: check_worker(config, rh)) if __name__ == "__main__": main() diff --git a/bin/stop.py b/bin/stop.py index d44bbdc10..defdc5725 100755 --- a/bin/stop.py +++ b/bin/stop.py @@ -16,26 +16,16 @@ def stop_webserver(config): rh = config.get_webserver() message = "Stopping web server on " + str(rh) logger.info(message) - rh.run_remote_shell_command("if pgrep -f tomcat; then " + config.service_folder + "/" + - config.tomcat + "/bin/shutdown.sh; echo Stopped; else " + - " echo \"Web server already stopped on " + str(rh.host) +"\"; " + - " true; fi") - rh.run_remote_shell_command( - # The true is there to ignore the exit code of pkill - "pkill -f Bootstrap; true") - -def stop_worker(rh): - """Stops a Hillview service on a remote worker machine""" - message = "Stopping hillview on " + str(rh.host) - logger.info(message) - rh.run_remote_shell_command("if pgrep -f hillview-server; then pkill -f hillview-server; "+ - "true; echo Stopped ; else echo \"Hillview already stopped on " + - str(rh.host) +"\"; true; fi") -def stop_backends(config): - """Stops all Hillview workers""" - assert isinstance(config, ClusterConfiguration) - config.run_on_all_workers(stop_worker, True) - config.run_on_all_aggregators(stop_worker, True) + rh.run_remote_shell_command(config.service_folder + "/hillview-webserver-manager.sh stop") + +def stop_worker(config, rh): + """Stops a Hillview worker service on a remote machine""" + # The pkill || true is there for older installations which may not have the worker-manager installed + rh.run_remote_shell_command(config.service_folder + "/hillview-worker-manager.sh stop || pkill -f hillview-server || true") + +def stop_aggregator(config, rh): + """Stops a Hillview aggregator service on a remote machine""" + rh.run_remote_shell_command(config.service_folder + "/hillview-aggregator-manager.sh stop || pkill -f hillview-server || true") def main(): """Main function""" @@ -44,7 +34,8 @@ def main(): args = parser.parse_args() config = get_config(parser, args) stop_webserver(config) - stop_backends(config) + config.run_on_all_workers(lambda rh: stop_worker(config, rh)) + config.run_on_all_aggregators(lambda rh: stop_aggregator(config, rh)) if __name__ == "__main__": main() diff --git a/platform/src/main/java/org/hillview/main/DemoDataCleaner.java b/platform/src/main/java/org/hillview/main/DemoDataCleaner.java index f999ea73c..73bf4d41c 100644 --- a/platform/src/main/java/org/hillview/main/DemoDataCleaner.java +++ b/platform/src/main/java/org/hillview/main/DemoDataCleaner.java @@ -28,31 +28,19 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collections; import java.util.Comparator; -import java.util.HashSet; -import java.util.Set; import java.util.stream.Stream; /** * This entry point is only used for preparing some data files for a demo. * It takes files named like data/ontime/On_Time_On_Time_Performance_*_*.csv and - * removes some columns from them. Optionally, it can also split these into - * smaller files each. + * removes some columns from them and converts them to the ORC format. */ class DemoDataCleaner { private static final String dataFolder = "../data/ontime"; public static void main(String[] args) throws IOException { HillviewLogger.initialize("data cleaner", "hillview.log"); - Set columns = new HashSet(); - Collections.addAll(columns, - "DayOfWeek", "FlightDate", "UniqueCarrier", - "Origin", "OriginCityName", "OriginState", "Dest", "DestState", - "DepTime", "DepDelay", "ArrTime", "ArrDelay", "Cancelled", - "ActualElapsedTime", "Distance"); - - System.out.println("Splitting files in folder " + dataFolder); String prefix = "On_Time_On_Time_Performance_"; Path folder = Paths.get(dataFolder); Stream files = Files.walk(folder, 1); @@ -79,11 +67,14 @@ public static void main(String[] args) throws IOException { String end = filename.replace(prefix, ""); if (end.endsWith(".gz")) // the output is uncompressed - end = end.replace(".gz", ""); - CsvFileWriter writer = new CsvFileWriter(end); - System.out.println("Writing " + end); - writer.writeTable(p); - /* + end = end.replace(".gz", ""); + if (!Files.exists(Paths.get(end))) { + CsvFileWriter writer = new CsvFileWriter(end); + System.out.println("Writing " + end); + writer.writeTable(p); + } + + end = end.replace(".csv", ".orc"); OrcFileWriter owriter = new OrcFileWriter(end); System.out.println("Writing " + end); owriter.writeTable(p); @@ -92,7 +83,6 @@ public static void main(String[] args) throws IOException { owriter = new OrcFileWriter(big); System.out.println("Writing " + big); owriter.writeTable(tbl); - */ }); } } diff --git a/platform/src/main/java/org/hillview/main/HillviewServerRunner.java b/platform/src/main/java/org/hillview/main/HillviewServerRunner.java index fd56954ea..dea509eaa 100644 --- a/platform/src/main/java/org/hillview/main/HillviewServerRunner.java +++ b/platform/src/main/java/org/hillview/main/HillviewServerRunner.java @@ -26,6 +26,9 @@ import org.hillview.utils.HillviewLogger; import org.hillview.utils.HostAndPort; +import java.io.FileWriter; +import java.lang.management.ManagementFactory; + /** * Starts a Hillview server. Depending on the command-line arguments it * will start either a worker node, or an aggregator node that talks to many worker nodes. @@ -37,7 +40,9 @@ private static void usage() { "`host:port` is the address where the service receives requests.\n" + "`children` is an optional file that contains a list of children nodes.\n" + " Each child is of the form host:port.\n" + - " If present this server will work as an aggregator node.\n" + " If present this server will work as an aggregator node.\n" + + "The process will writes its process id into a file named hillview-worker.pid\n" + + "or hillview-aggregator.pid\n" ); } @@ -45,18 +50,21 @@ public static void main(String[] args) { try { String hostAndPort; IDataSet initial; + String pidfile; if (args.length == 1) { // worker node HillviewLogger.initialize("worker", "hillview.log"); initial = new LocalDataSet(Empty.getInstance()); hostAndPort = args[0]; + pidfile = "hillview-worker.pid"; } else if (args.length == 2) { // aggregator node HillviewLogger.initialize("aggregator", "hillview-agg.log"); HostList cluster = HostList.fromFile(args[0]); initial = RemoteDataSet.createCluster(cluster, RemoteDataSet.defaultDatasetIndex); hostAndPort = args[1]; + pidfile = "hillview-aggregator.pid"; } else { usage(); return; @@ -64,6 +72,23 @@ public static void main(String[] args) { new HillviewServer(HostAndPort.fromString(hostAndPort), initial); HillviewLogger.instance.info("Created HillviewServer"); + + // Try to find out own PID + String jvmName = ManagementFactory.getRuntimeMXBean().getName(); + int index = jvmName.indexOf('@'); + if (index > 0) { + try { + long pid = Long.parseLong(jvmName.substring(0, index)); + FileWriter writer = new FileWriter(pidfile, false); + writer.write(Long.toString(pid)); + writer.close(); + } catch (NumberFormatException e) { + HillviewLogger.instance.warn("Cannot find out self pid"); + } + } else { + HillviewLogger.instance.warn("Cannot find out self pid"); + } + Thread.currentThread().join(); } catch (Exception ex) { HillviewLogger.instance.error("Caught exception; exiting", ex); diff --git a/platform/src/main/java/org/hillview/sketches/FileSizeSketch.java b/platform/src/main/java/org/hillview/sketches/FileSizeSketch.java index 3eceda416..e6334f97d 100644 --- a/platform/src/main/java/org/hillview/sketches/FileSizeSketch.java +++ b/platform/src/main/java/org/hillview/sketches/FileSizeSketch.java @@ -20,12 +20,14 @@ import org.hillview.dataset.api.IJson; import org.hillview.dataset.api.ISketch; import org.hillview.storage.IFileReference; +import org.hillview.utils.Converters; import javax.annotation.Nullable; public class FileSizeSketch implements ISketch { @Override - public Info create(IFileReference data) { + public Info create(@Nullable IFileReference data) { + Converters.checkNull(data); return new Info(1, data.getSizeInBytes()); } diff --git a/platform/src/main/java/org/hillview/table/Schema.java b/platform/src/main/java/org/hillview/table/Schema.java index d790cf880..44e0a7c8b 100644 --- a/platform/src/main/java/org/hillview/table/Schema.java +++ b/platform/src/main/java/org/hillview/table/Schema.java @@ -221,7 +221,7 @@ public String newColumnName(@Nullable String prefix) { while (true) { if (!this.columns.containsKey(name)) return name; - name = prefix + Integer.toString(i); + name = prefix + i; ++i; } } diff --git a/web/src/main/webapp/dataViews/histogramView.ts b/web/src/main/webapp/dataViews/histogramView.ts index f5402a74e..7f211f129 100644 --- a/web/src/main/webapp/dataViews/histogramView.ts +++ b/web/src/main/webapp/dataViews/histogramView.ts @@ -517,7 +517,10 @@ export class HistogramView extends HistogramViewBase implements IScrollTarget { const title = new PageTitle("Filtered " + this.schema.displayName(this.xAxisData.description.name)); const renderer = new FilterReceiver(title, [this.xAxisData.description], this.schema, [0], this.page, rr, this.dataset, { - exact: this.samplingRate >= 1, reusePage: false, relative: false, chartKind: "Histogram" + exact: this.samplingRate >= 1, + reusePage: false, + relative: false, + chartKind: "Histogram" }); rr.invoke(renderer); } diff --git a/web/src/main/webapp/loadMenu.ts b/web/src/main/webapp/loadMenu.ts index 5cef6b42b..dab3b112e 100644 --- a/web/src/main/webapp/loadMenu.ts +++ b/web/src/main/webapp/loadMenu.ts @@ -52,7 +52,7 @@ export class LoadMenu extends RemoteObject implements IDataView { { text: "Flights (15 columns, CSV)", action: () => { const files: FileSetDescription = { - fileNamePattern: "../data/ontime/????_*.csv*", + fileNamePattern: "data/ontime/????_*.csv*", schemaFile: "short.schema", headerRow: true, repeat: 1, @@ -69,7 +69,7 @@ export class LoadMenu extends RemoteObject implements IDataView { { text: "Flights (15 columns, ORC)", action: () => { const files: FileSetDescription = { - fileNamePattern: "../data/ontime_small_orc/*.orc", + fileNamePattern: "data/ontime_small_orc/*.orc", schemaFile: "schema", headerRow: true, repeat: 1, @@ -86,7 +86,7 @@ export class LoadMenu extends RemoteObject implements IDataView { { text: "Flights (all columns, ORC)", action: () => { const files: FileSetDescription = { - fileNamePattern: "../data/ontime_big_orc/*.orc", + fileNamePattern: "data/ontime_big_orc/*.orc", schemaFile: "schema", headerRow: true, repeat: 1, diff --git a/web/src/main/webapp/tableTarget.ts b/web/src/main/webapp/tableTarget.ts index 1dc4169fa..e8d2e62c5 100644 --- a/web/src/main/webapp/tableTarget.ts +++ b/web/src/main/webapp/tableTarget.ts @@ -312,8 +312,7 @@ RpcRequest> { public createFilter2DRequest(xRange: FilterDescription, yRange: FilterDescription): RpcRequest> { - return this.createStreamingRpcRequest("filter2DRange", - {first: xRange, second: yRange}); + return this.createStreamingRpcRequest("filter2DRange", { first: xRange, second: yRange } ); } public createHistogram2DRequest(info: HistogramArgs[]): @@ -354,7 +353,7 @@ RpcRequest> { RpcRequest> { return this.createStreamingRpcRequest("categoricalCentroidsControlPoints", { categoricalColumnName: categoricalColumnName, - numericalColumnNames: numericalColumnNames} ); + numericalColumnNames: numericalColumnNames } ); } public createMDSProjectionRequest(id: RemoteObjectId): RpcRequest> { @@ -367,7 +366,7 @@ RpcRequest> { RpcRequest> { return this.createStreamingRpcRequest("lampMap", {controlPointsId: controlPointsId, colNames: colNames, - newLowDimControlPoints: controlPoints, newColNames: newColNames}); + newLowDimControlPoints: controlPoints, newColNames: newColNames }); } }