From 61580c9ccf5a3a08c6244a75565cb8c2ff8f6ad9 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 19 Jul 2024 14:02:49 -0400 Subject: [PATCH] Code review changes --- .../hpccsystems/dfs/client/FileUtility.java | 152 ++++++++++-------- .../org/hpccsystems/dfs/client/Utils.java | 23 +-- 2 files changed, 96 insertions(+), 79 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 28e4311c5..116c37c21 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -81,7 +81,7 @@ private static class TaskContext private static class TaskOperation { public String currentOperationDesc = ""; - public long operationStart = 0; + public long operationStartNS = 0; public List errorMessages = new ArrayList(); public List warnMessages = new ArrayList(); @@ -92,23 +92,23 @@ private static class TaskOperation public AtomicLong bytesRead = new AtomicLong(0); public AtomicLong bytesWritten = new AtomicLong(0); - public Span taskSpan = null; + public Span operationSpan = null; public JSONObject end(boolean success) { if (success) { - taskSpan.setStatus(StatusCode.OK); + operationSpan.setStatus(StatusCode.OK); } else { - taskSpan.setStatus(StatusCode.ERROR); + operationSpan.setStatus(StatusCode.ERROR); } - taskSpan.end(); + operationSpan.end(); long totalOperationTime = System.nanoTime(); - totalOperationTime -= operationStart; + totalOperationTime -= operationStartNS; double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; @@ -152,42 +152,56 @@ public JSONObject end(boolean success) private Stack operations = new Stack(); public List operationResults = new ArrayList(); - public void setTaskSpanAttributes(Attributes attributes) + public void setCurrentOperationSpanAttributes(Attributes attributes) { - if (operations.empty()) + if (!hasCurrentOperation()) { return; } - TaskOperation op = operations.peek(); + TaskOperation op = getCurrentOperation(); - synchronized(op.taskSpan) + synchronized(op.operationSpan) { - op.taskSpan.setAllAttributes(attributes); + op.operationSpan.setAllAttributes(attributes); } } - public void makeTaskSpanCurrent() + public void addCurrentOperationSpanAttribute(AttributeKey key, Object value) { - if (operations.empty()) + if (!hasCurrentOperation()) { return; } - TaskOperation op = operations.peek(); + TaskOperation op = getCurrentOperation(); - synchronized(op.taskSpan) + synchronized(op.operationSpan) { - op.taskSpan.makeCurrent(); + op.operationSpan.setAttribute(key, value); + } + } + + public void makeCurrentOperationSpanCurrent() + { + if (!hasCurrentOperation()) + { + return; + } + TaskOperation op = getCurrentOperation(); + + synchronized(op.operationSpan) + { + op.operationSpan.makeCurrent(); } } public boolean hasError() { - if (operations.empty()) + if (!hasCurrentOperation()) { return false; } - TaskOperation op = operations.peek(); + TaskOperation op = getCurrentOperation(); boolean err = false; synchronized(op.errorMessages) @@ -200,47 +214,47 @@ public boolean hasError() public void addError(String error) { - if (operations.empty()) + if (!hasCurrentOperation()) { return; } - TaskOperation op = operations.peek(); + TaskOperation op = getCurrentOperation(); synchronized(op.errorMessages) { op.errorMessages.add(error); } - synchronized(op.taskSpan) + synchronized(op.operationSpan) { - op.taskSpan.recordException(new Exception(error)); + op.operationSpan.recordException(new Exception(error)); } } public void addWarn(String warn) { - if (operations.empty()) + if (!hasCurrentOperation()) { return; } - TaskOperation op = operations.peek(); + TaskOperation op = getCurrentOperation(); synchronized(op.warnMessages) { op.warnMessages.add(warn); } - synchronized(op.taskSpan) + synchronized(op.operationSpan) { - op.taskSpan.addEvent(warn); + op.operationSpan.addEvent(warn); } } - public boolean hasOperation() + public boolean hasCurrentOperation() { - if (operations.empty()) + if (operations.isEmpty()) { return false; } @@ -248,9 +262,9 @@ public boolean hasOperation() return true; } - public TaskOperation getOperation() + public TaskOperation getCurrentOperation() { - if (operations.empty()) + if (!hasCurrentOperation()) { return null; } @@ -258,22 +272,26 @@ public TaskOperation getOperation() return operations.peek(); } + private void setCurrentOperation(TaskOperation op) + { + operations.push(op); + } + public void startOperation(String operationName) { TaskOperation op = new TaskOperation(); op.currentOperationDesc = operationName; - op.operationStart = System.nanoTime(); + op.operationStartNS = System.nanoTime(); Span parentSpan = null; - TaskOperation prevOp = getOperation(); + TaskOperation prevOp = getCurrentOperation(); if (prevOp != null) { - parentSpan = prevOp.taskSpan; + parentSpan = prevOp.operationSpan; } - op.taskSpan = Utils.createChildSpan(parentSpan, operationName); - - operations.push(op); + op.operationSpan = Utils.createChildSpan(parentSpan, operationName); + setCurrentOperation(op); } public void endOperation() @@ -283,14 +301,12 @@ public void endOperation() public void endOperation(boolean success) { - if (!hasOperation()) + if (!hasCurrentOperation()) { return; } - TaskOperation op = getOperation(); - JSONObject results = op.end(success); - operationResults.add(results); + operationResults.add(getCurrentOperation().end(success)); operations.pop(); } @@ -764,9 +780,9 @@ private static void executeTasks(Runnable[] tasks, int numThreads, TaskContext c public void run() { - // Make the task span is current for the thread, otherwise spans created + // Make sure the span is current for the thread, otherwise spans created // within this thread will not be children of the task span - context.makeTaskSpanCurrent(); + context.makeCurrentOperationSpanCurrent(); for (int j = 0; j < numSubTasks; j++) { @@ -800,18 +816,18 @@ public void run() try { HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); - readContext.parentSpan = context.getOperation().taskSpan; + readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.getOperation().recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } catch (Exception e) { @@ -833,7 +849,7 @@ private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, Split final int taskIndex = i; HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); - readContext.parentSpan = context.getOperation().taskSpan; + readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(readContext, fileParts[taskIndex], new HPCCRecordBuilder(recordDef)); @@ -859,13 +875,13 @@ public void run() splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getOperation().recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } splitTable.finish(fileReader.getStreamPosition()); fileReader.close(); - context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); fileWriter.finalize(); outputStream.close(); @@ -956,7 +972,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre { DataPartition inFilePart = inFileParts[incomingFilePartIndex + j]; HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); - readContext.parentSpan = context.getOperation().taskSpan; + readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; filePartReaders[j] = new HpccRemoteFileReader(readContext, inFilePart, new HPCCRecordBuilder(recordDef)); } @@ -964,7 +980,7 @@ private static Runnable[] createNonRedistributingCopyTasks(HPCCFile file, DFUCre HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); - writeContext.parentSpan = context.getOperation().taskSpan; + writeContext.parentSpan = context.getCurrentOperation().operationSpan; writeContext.recordDef = recordDef; writeContext.fileCompression = CompressionAlgorithm.NONE; final HPCCRemoteFileWriter partFileWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); @@ -985,16 +1001,16 @@ public void run() { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getOperation().recordsWritten.incrementAndGet(); - context.getOperation().recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } fileReader.close(); - context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); } System.out.println("Closing file writer for task: " + taskIndex); fileWriter.close(); - context.getOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getCurrentOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -1093,7 +1109,7 @@ private static Runnable[] createWriteTasks(String[] srcFiles, SplitTable[] split HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); HPCCRemoteFileWriter.FileWriteContext writeContext = new HPCCRemoteFileWriter.FileWriteContext(); - writeContext.parentSpan = context.getOperation().taskSpan; + writeContext.parentSpan = context.getCurrentOperation().operationSpan; writeContext.recordDef = recordDef; writeContext.fileCompression = CompressionAlgorithm.NONE; HPCCRemoteFileWriter filePartWriter = new HPCCRemoteFileWriter(writeContext, outFilePart, recordAccessor); @@ -1143,15 +1159,15 @@ public void run() { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.getOperation().recordsWritten.incrementAndGet(); - context.getOperation().recordsRead.incrementAndGet(); + context.getCurrentOperation().recordsWritten.incrementAndGet(); + context.getCurrentOperation().recordsRead.incrementAndGet(); } - context.getOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); } fileWriter.close(); - context.getOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); + context.getCurrentOperation().bytesWritten.addAndGet(fileWriter.getBytesWritten()); } catch (Exception e) { @@ -1224,7 +1240,7 @@ private static void performRead(String[] args, TaskContext context) { String datasetName = datasets[i]; context.startOperation("FileUtility.Read_" + datasetName); - context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1412,7 +1428,7 @@ private static void performReadTest(String[] args, TaskContext context) String datasetName = cmd.getOptionValue("read_test"); context.startOperation("FileUtility.ReadTest_" + datasetName); - context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.url"), connString)); HPCCFile file = null; try @@ -1596,7 +1612,7 @@ private static void performCopy(String[] args, TaskContext context) String destFile = copyPairs[i+1]; context.startOperation("FileUtility.Copy_ " + srcFile + " -> " + destFile); - context.setTaskSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL, + context.setCurrentOperationSpanAttributes(Attributes.of(AttributeKey.stringKey("server.src.url"), srcURL, AttributeKey.stringKey("server.dest.url"), destURL)); HPCCFile file = null; @@ -1667,8 +1683,8 @@ private static void performCopy(String[] args, TaskContext context) try { - long bytesWritten = context.getOperation().bytesWritten.get(); - long recordsWritten = context.getOperation().recordsWritten.get(); + long bytesWritten = context.getCurrentOperation().bytesWritten.get(); + long recordsWritten = context.getCurrentOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1779,7 +1795,7 @@ private static void performWrite(String[] args, TaskContext context) context.startOperation( "FileUtility.Write_" + srcFile + "_to_" + destFile); Attributes attributes = Attributes.of(AttributeKey.stringKey("server.url"), destURL); - context.setTaskSpanAttributes(attributes); + context.setCurrentOperationSpanAttributes(attributes); SplitTable[] splitTables = null; String[] srcFiles = null; @@ -1894,8 +1910,8 @@ private static void performWrite(String[] args, TaskContext context) try { - long bytesWritten = context.getOperation().bytesWritten.get(); - long recordsWritten = context.getOperation().recordsWritten.get(); + long bytesWritten = context.getCurrentOperation().bytesWritten.get(); + long recordsWritten = context.getCurrentOperation().recordsWritten.get(); dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, bytesWritten, true); } catch (Exception e) @@ -1975,7 +1991,7 @@ else if (cmd.hasOption("write")) } // If we are still in the middle of an operation there was a failure - if (context.hasOperation()) + if (context.hasCurrentOperation()) { boolean succeded = false; context.endOperation(succeded); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java index 04d7887ea..20f9b8607 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/Utils.java @@ -46,25 +46,26 @@ public static Tracer getTelemetryTracer() public static Span createSpan(String name) { - Span span = Utils.getTelemetryTracer().spanBuilder(name) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - span.makeCurrent(); - - return span; + return createChildSpan(null, name); } public static Span createChildSpan(Span parentSpan, String name) { + Span span = null; if (parentSpan == null) { - return createSpan(name); + span = Utils.getTelemetryTracer().spanBuilder(name) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + } + else + { + span = Utils.getTelemetryTracer().spanBuilder(name) + .setParent(Context.current().with(parentSpan)) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); } - Span span = Utils.getTelemetryTracer().spanBuilder(name) - .setParent(Context.current().with(parentSpan)) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); span.makeCurrent(); return span; }