Skip to content

Commit

Permalink
HPCC4J-611 Add OpenTelemetry tracing to dfsclient
Browse files Browse the repository at this point in the history
- Updated event attribute reporting
- Fixed trace structure in dfsclient protocol
- Added tracing to FileUtility

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Jul 10, 2024
1 parent f9922e2 commit fae1052
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.nio.file.Files;
import java.nio.file.Paths;

import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;

import org.hpccsystems.commons.ecl.FieldDef;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -1777,6 +1779,8 @@ private static void performWrite(String[] args, TaskContext context)
*/
public static JSONArray run(String[] args)
{
AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();

Options options = getTopLevelOptions();
CommandLineParser parser = new DefaultParser();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,7 @@ private void setPrefetchException(HpccFileException e)

if (readSpan != null)
{
Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ExceptionAttributes.EXCEPTION_MESSAGE, e.getMessage());
readSpan.recordException(e, attributes);
}
Expand Down Expand Up @@ -725,8 +724,7 @@ public void startBlockingFetchRequest(List<Long> fetchOffsets) throws Exception
Exception wrappedException = new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode.");
if (readSpan != null)
{
Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
readSpan.recordException(wrappedException, attributes);
}
Expand Down Expand Up @@ -814,8 +812,7 @@ private int startFetch()

if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
AttributeKey.longKey("read.offset"), streamPos,
AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000));
readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
Expand Down Expand Up @@ -977,6 +974,9 @@ else if (this.handle == 0)
{
if (dataLen == 0)
{
// Read handle
dis.readInt();

close();
return 0;
}
Expand Down Expand Up @@ -1038,8 +1038,7 @@ private void readDataInFetch()
IOException wrappedException = new IOException(prefix + "Encountered unexpected end of stream mid fetch, this.dis.available() returned " + bytesToRead + " bytes.");
if (readSpan != null)
{
Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
readSpan.recordException(wrappedException, attributes);
}
Expand Down Expand Up @@ -1132,9 +1131,8 @@ private void finishFetch()

if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest));
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
AttributeKey.longKey("read.bytesRead"), Long.valueOf(totalDataInCurrentRequest));
readSpan.addEvent("RowServiceInputStream.readResponse", attributes);
}

Expand All @@ -1146,10 +1144,9 @@ private void finishFetch()
{
if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
AttributeKey.longKey("read.offset"), streamPos,
AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000));
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
AttributeKey.longKey("read.offset"), streamPos,
AttributeKey.longKey("read.size"), Long.valueOf(maxReadSizeKB*1000));
readSpan.addEvent("RowServiceInputStream.readRequest", attributes);
}

Expand Down Expand Up @@ -1335,8 +1332,7 @@ public int available() throws IOException
IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0");
if (readSpan != null)
{
Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
readSpan.recordException(wrappedException, attributes);
}
Expand Down Expand Up @@ -1679,8 +1675,7 @@ private void makeActive() throws HpccFileException

if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()));
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) );
readSpan.addEvent("RowServiceInputStream.connect", attributes);
}

Expand Down Expand Up @@ -1753,8 +1748,7 @@ private void makeActive() throws HpccFileException

if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()));
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()) );
readSpan.addEvent("RowServiceInputStream.versionRequest", attributes);
}

Expand Down Expand Up @@ -1795,9 +1789,8 @@ private void makeActive() throws HpccFileException

if (readSpan != null)
{
Attributes attributes = Attributes.of(ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
ServiceAttributes.SERVICE_VERSION, rowServiceVersion);
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ServiceAttributes.SERVICE_VERSION, rowServiceVersion);
readSpan.addEvent("RowServiceInputStream.versionResponse", attributes);
}
}
Expand Down Expand Up @@ -2127,7 +2120,7 @@ private void makeFetchObject(StringBuilder sb)

private String makeGetVersionRequest()
{
final String trace = traceContextHeader != null ? "\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n" : "";
final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
final String versionMsg = RFCCodes.RFCStreamReadCmd + "{ \"command\" : \"version\", \"handle\": \"-1\", " + trace + " \"format\": \"binary\" }";
return versionMsg;
}
Expand All @@ -2140,10 +2133,8 @@ private String makeInitialRequest()
sb.append("{ \"format\" : \"binary\", \n");
sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");

if (traceContextHeader != null)
{
sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n");
}
final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
sb.append(trace);

if (!useOldProtocol)
{
Expand Down Expand Up @@ -2213,10 +2204,8 @@ private String makeHandleRequest()
sb.append("{ \"format\" : \"binary\",\n");
sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\",");

if (traceContextHeader != null)
{
sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n");
}
final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
sb.append(trace);

if (!useOldProtocol)
{
Expand All @@ -2241,10 +2230,8 @@ private String makeTokenRequest()
sb.append("{ \"format\" : \"binary\",\n");
sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");

if (traceContextHeader != null)
{
sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n");
}
final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
sb.append(trace);

if (!useOldProtocol)
{
Expand All @@ -2266,10 +2253,8 @@ private String makeCloseHandleRequest()
sb.append("{ \"format\" : \"binary\",\n");
sb.append(" \"handle\" : \"" + Integer.toString(this.handle) + "\",");

if (traceContextHeader != null)
{
sb.append("\"_trace/traceparent\" : \"" + traceContextHeader + "\",\n");
}
final String trace = traceContextHeader != null ? "\"_trace\": { \"traceparent\" : \"" + traceContextHeader + "\" },\n" : "";
sb.append(trace);

sb.append(" \"command\" : \"close\"");
sb.append("\n}");
Expand Down Expand Up @@ -2311,8 +2296,7 @@ private void sendCloseFileRequest() throws IOException
IOException wrappedException = new IOException(prefix + "Failed to close file. Unable to read response with error: " + e.getMessage(), e);
if (readSpan != null)
{
Attributes attributes = Attributes.of( ServerAttributes.SERVER_ADDRESS, getIP(),
ServerAttributes.SERVER_PORT, Long.valueOf(getPort()),
Attributes attributes = Attributes.of( AttributeKey.longKey("server.index"), Long.valueOf(getFilePartCopy()),
ExceptionAttributes.EXCEPTION_MESSAGE, wrappedException.getMessage());
readSpan.recordException(wrappedException, attributes);
}
Expand Down
Loading

0 comments on commit fae1052

Please sign in to comment.