From 51fea5705b76bdd1798833dd44968113b4bcf22a Mon Sep 17 00:00:00 2001 From: James McMullan Date: Tue, 24 Sep 2024 09:16:33 -0400 Subject: [PATCH] Code review changes --- .../hpccsystems/dfs/client/FileUtility.java | 40 ++++++++----------- .../dfs/client/RowServiceInputStream.java | 4 +- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 0efa90d20..6a13f9424 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -640,7 +640,8 @@ private static Options getReadOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); - options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("read") .argName("files") @@ -661,14 +662,16 @@ private static Options getReadTestOptions() options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("access_expiry_seconds", true, "Access token expiration seconds."); - options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice."); + options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice," + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice."); options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice."); options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster."); options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); - options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("file_parts") .argName("_file_parts") @@ -692,7 +695,8 @@ private static Options getCopyOptions() options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files."); options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); - options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("copy") .argName("files") @@ -715,7 +719,8 @@ private static Options getWriteOptions() options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to."); options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds."); - options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."); + options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently." + + " useful in cases where starting up connections too quickly can overwhelm intermediate processes."); options.addOption(Option.builder("write") .argName("files") @@ -954,13 +959,11 @@ public void run() fileReader.getInputStream().setReadRequestDelay(readRequestDelay); fileReader.setMaxReadRetries(context.readRetries); - long recCount = 0; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - recCount++; + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.getCurrentOperation().recordsRead.addAndGet(recCount); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -1010,15 +1013,13 @@ public void run() { try { - long recCount = 0; while (fileReader.hasNext()) { splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - recCount++; + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.getCurrentOperation().recordsRead.addAndGet(recCount); splitTable.finish(fileReader.getStreamPosition()); @@ -1141,18 +1142,14 @@ public void run() { for (int k = 0; k < fileReaders.length; k++) { - long recordsRead = 0; - long recordsWritten = 0; HpccRemoteFileReader fileReader = fileReaders[k]; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - recordsRead++; - recordsWritten++; + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); - context.getCurrentOperation().recordsRead.addAndGet(recordsRead); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -1305,19 +1302,14 @@ public void run() splitEnd = endingSplit.splitEnd; } - long recordsRead = 0; - long recordsWritten = 0; while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd) { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - recordsRead++; - recordsWritten++; + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); - context.getCurrentOperation().recordsRead.addAndGet(recordsRead); - context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); } diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 7fe3c0ae0..0b6049566 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -114,11 +114,11 @@ private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, i public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25; public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations - public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 10; + public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 100; // Note: The platform may respond with more data than this if records are larger than this limit. public static final int DEFAULT_MAX_READ_SIZE_KB = 4096; - public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 32; + public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 256; private static final int SHORT_SLEEP_MS = 1; private static final int LONG_WAIT_THRESHOLD_US = 100;