From 7f1d9fa866e01a7d9ceccb3344ae2e9e271441aa Mon Sep 17 00:00:00 2001 From: Leonid Shtivelman Date: Mon, 26 Feb 2024 17:07:10 -0500 Subject: [PATCH 01/27] update to use open telemetry --- .../pom.xml | 17 ++ .../postgres/PostgresServerLauncher.java | 11 +- .../engine/postgres/PostgresWireProtocol.java | 163 ++++++++++------ .../engine/postgres/ResultSetReceiver.java | 94 ++++++--- .../finos/legend/engine/postgres/Session.java | 179 ++++++++++++------ .../engine/postgres/config/ServerConfig.java | 7 + .../legend/LegendExecutionService.java | 31 ++- .../handler/legend/LegendStatement.java | 1 + .../engine/postgres/utils/OpenTelemetry.java | 108 +++++++++++ .../postgres/utils/PrometheusCollector.java | 84 ++++++++ .../PostgresServerSimpleTestClient.java | 30 +-- .../src/test/resources/staticConfig.json | 3 +- 12 files changed, 572 insertions(+), 156 deletions(-) create mode 100644 legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/OpenTelemetry.java create mode 100644 legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/PrometheusCollector.java diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/pom.xml b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/pom.xml index 5d5835c18f1..93d0d50a11c 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/pom.xml +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/pom.xml @@ -254,6 +254,23 @@ javax.ws.rs-api + + + + io.opentelemetry.javaagent + opentelemetry-javaagent + 2.0.0 + + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + 1.35.0 + runtime + + + + com.github.tomakehurst diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresServerLauncher.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresServerLauncher.java index f36278ad0f7..1daa8df0bac 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresServerLauncher.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresServerLauncher.java @@ -15,9 +15,7 @@ package org.finos.legend.engine.postgres; import com.fasterxml.jackson.databind.ObjectMapper; - import java.io.File; - import org.finos.legend.engine.postgres.auth.AuthenticationMethod; import org.finos.legend.engine.postgres.config.ServerConfig; import org.slf4j.Logger; @@ -62,6 +60,15 @@ public void launch() throws Exception AuthenticationMethod authenticationMethod = serverConfig.buildAuthenticationMethod(); logger.info("Starting server in port: " + serverConfig.getPort()); + + +// /* JvmMetrics.builder().register(); +// +// HTTPServer server = HTTPServer.builder().port(serverConfig.getMetricsPort()).buildAndStart(); +// +// System.out.println("HTTPServer listening on port http://localhost:" + server.getPort() + "/metrics");*/ + + new PostgresServer(serverConfig, sessionFactory, (user, connectionProperties) -> authenticationMethod).run(); } diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresWireProtocol.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresWireProtocol.java index 47e29bc36db..7f03e615919 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresWireProtocol.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/PostgresWireProtocol.java @@ -29,16 +29,19 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope; import org.finos.legend.engine.postgres.auth.AuthenticationMethod; import org.finos.legend.engine.postgres.auth.AuthenticationMethodType; import org.finos.legend.engine.postgres.auth.AuthenticationProvider; import org.finos.legend.engine.postgres.auth.KerberosIdentityProvider; import org.finos.legend.engine.postgres.config.GSSConfig; -import org.finos.legend.engine.postgres.handler.PostgresResultSet; import org.finos.legend.engine.postgres.handler.PostgresResultSetMetaData; import org.finos.legend.engine.postgres.types.PGType; import org.finos.legend.engine.postgres.types.PGTypes; import org.finos.legend.engine.postgres.utils.ExceptionUtil; +import org.finos.legend.engine.postgres.utils.OpenTelemetry; import org.finos.legend.engine.shared.core.identity.Identity; import org.finos.legend.engine.shared.core.kerberos.SubjectTools; import org.ietf.jgss.GSSContext; @@ -60,7 +63,6 @@ import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.sql.ParameterMetaData; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -807,24 +809,45 @@ private List createList(short size) */ private void handleDescribeMessage(ByteBuf buffer, Channel channel) throws Exception { - byte type = buffer.readByte(); - String portalOrStatement = readCString(buffer); - DescribeResult describeResult = session.describe((char) type, portalOrStatement); - PostgresResultSetMetaData fields = describeResult.getFields(); - if (type == 'S') + OpenTelemetry.TOTAL_METADATA.add(1); + OpenTelemetry.ACTIVE_METADATA.add(1); + long startTime = System.currentTimeMillis(); + + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("PostgresWireProtocol.handleDescribeMessage").startSpan(); + try (Scope scope = span.makeCurrent()) { - ParameterMetaData parameters = describeResult.getParameters(); - Messages.sendParameterDescription(channel, parameters); + byte type = buffer.readByte(); + String portalOrStatement = readCString(buffer); + DescribeResult describeResult = session.describe((char) type, portalOrStatement); + PostgresResultSetMetaData fields = describeResult.getFields(); + if (type == 'S') + { + ParameterMetaData parameters = describeResult.getParameters(); + Messages.sendParameterDescription(channel, parameters); + } + if (fields == null) + { + Messages.sendNoData(channel); + } + else + { + FormatCodes.FormatCode[] resultFormatCodes = + type == 'P' ? session.getResultFormatCodes(portalOrStatement) : null; + Messages.sendRowDescription(channel, fields, resultFormatCodes); + } + OpenTelemetry.TOTAL_SUCCESS_METADATA.add(1); + OpenTelemetry.METADATA_DURATION.record(System.currentTimeMillis() - startTime); } - if (fields == null) + catch (Exception e) { - Messages.sendNoData(channel); + span.recordException(e); + OpenTelemetry.TOTAL_FAILURE_METADATA.add(1); + throw e; } - else + finally { - FormatCodes.FormatCode[] resultFormatCodes = - type == 'P' ? session.getResultFormatCodes(portalOrStatement) : null; - Messages.sendRowDescription(channel, fields, resultFormatCodes); + OpenTelemetry.ACTIVE_METADATA.add(-1); } } @@ -835,9 +858,15 @@ private void handleDescribeMessage(ByteBuf buffer, Channel channel) throws Excep */ private void handleExecute(ByteBuf buffer, DelayableWriteChannel channel) { - String portalName = readCString(buffer); - int maxRows = buffer.readInt(); - String query = session.getQuery(portalName); + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("PostgresWireProtocol.handleExecute").startSpan(); + try (Scope scope = span.makeCurrent()) + { + String portalName = readCString(buffer); + int maxRows = buffer.readInt(); + String query = session.getQuery(portalName); + span.setAttribute("portalName", portalName); + span.setAttribute("query", query); /* if (query.isEmpty()) { // remove portal so that it doesn't stick around and no attempt to batch it with follow up statement is made session.close((byte) 'P', portalName); @@ -889,16 +918,19 @@ private void handleExecute(ByteBuf buffer, DelayableWriteChannel channel) session.execute(portalName, maxRows, resultReceiver);*/ - try - { DelayableWriteChannel.DelayedWrites delayedWrites = channel.delayWrites(); ResultSetReceiver resultReceiver = new ResultSetReceiver(query, channel, delayedWrites, false, null); session.execute(portalName, maxRows, resultReceiver); } catch (Exception e) { + span.recordException(e); throw ExceptionUtil.wrapException(e); } + finally + { + span.end(); + } } @@ -972,57 +1004,78 @@ private void handleClose(ByteBuf buffer, Channel channel) }*/ void handleSimpleQuery(ByteBuf buffer, final DelayableWriteChannel channel) { - String queryString = readCString(buffer); - assert queryString != null : "query must not be nulL"; + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("PostgresWireProtocol.handleSimpleQuery").startSpan(); + try (Scope scope = span.makeCurrent()) + { + String queryString = readCString(buffer); + assert queryString != null : "query must not be nulL"; + span.setAttribute("query", queryString); + if (queryString.isEmpty() || ";".equals(queryString)) + { + Messages.sendEmptyQueryResponse(channel); + Messages.sendReadyForQuery(channel); + return; + } - if (queryString.isEmpty() || ";".equals(queryString)) + List queries = QueryStringSplitter.splitQuery(queryString); + CompletableFuture composedFuture = CompletableFuture.completedFuture(null); + for (String query : queries) + { + composedFuture = composedFuture.thenCompose(result -> handleSingleQuery(query, channel)); + } + composedFuture.whenComplete(new ReadyForQueryCallback(channel)); + } + catch (Exception e) { - Messages.sendEmptyQueryResponse(channel); - Messages.sendReadyForQuery(channel); - return; + span.recordException(e); + throw e; } - - List queries = QueryStringSplitter.splitQuery(queryString); - CompletableFuture composedFuture = CompletableFuture.completedFuture(null); - for (String query : queries) + finally { - composedFuture = composedFuture.thenCompose(result -> handleSingleQuery(query, channel)); + span.end(); } - composedFuture.whenComplete(new ReadyForQueryCallback(channel)); - } private CompletableFuture handleSingleQuery(String query, DelayableWriteChannel channel) { - CompletableFuture result = new CompletableFuture<>(); - - if (query.isEmpty() || ";".equals(query)) + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("PostgresWireProtocol.handleSimpleQuery").startSpan(); + try (Scope scope = span.makeCurrent()) { - Messages.sendEmptyQueryResponse(channel); - result.complete(null); - return result; - } - try - { - DelayableWriteChannel.DelayedWrites delayedWrites = channel.delayWrites(); - ResultSetReceiver resultReceiver = new ResultSetReceiver(query, channel, delayedWrites, true, null); - session.executeSimple(query, resultReceiver); - return session.sync(); + CompletableFuture result = new CompletableFuture<>(); + + if (query.isEmpty() || ";".equals(query)) + { + Messages.sendEmptyQueryResponse(channel); + result.complete(null); + return result; + } + try + { + DelayableWriteChannel.DelayedWrites delayedWrites = channel.delayWrites(); + ResultSetReceiver resultReceiver = new ResultSetReceiver(query, channel, delayedWrites, true, null); + session.executeSimple(query, resultReceiver); + return session.sync(); + } + catch (Throwable t) + { + //TODO need to understand this usecase + LOGGER.warn("Error processing single query", t); + session.clearState(); + Messages.sendErrorResponse(channel, t); + result.completeExceptionally(t); + return result; + } } - catch (Throwable t) + finally { - //TODO need to understand this usecase - LOGGER.warn("Error processing single query", t); - session.clearState(); - Messages.sendErrorResponse(channel, t); - result.completeExceptionally(t); - return result; + span.end(); } - } @@ -1054,7 +1107,7 @@ public GSSCredential run() throws Exception { final GSSName gssName = manager.createName(this.accountPrincipal, GSSName.NT_USER_NAME); return manager - .createCredential(gssName, GSSCredential.DEFAULT_LIFETIME, new Oid[]{ + .createCredential(gssName, GSSCredential.DEFAULT_LIFETIME, new Oid[] { new Oid("1.2.840.113554.1.2.2"), // Kerberos v5 new Oid("1.3.6.1.5.5.2") // SPNEGO }, GSSCredential.ACCEPT_ONLY); diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/ResultSetReceiver.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/ResultSetReceiver.java index 24e6c1887e0..c7cb98e2122 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/ResultSetReceiver.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/ResultSetReceiver.java @@ -23,6 +23,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.common.Attributes; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -31,6 +36,7 @@ import org.finos.legend.engine.postgres.handler.PostgresResultSetMetaData; import org.finos.legend.engine.postgres.types.PGType; import org.finos.legend.engine.postgres.types.PGTypes; +import org.finos.legend.engine.postgres.utils.OpenTelemetry; import org.slf4j.Logger; class ResultSetReceiver @@ -63,49 +69,81 @@ class ResultSetReceiver public void sendResultSet(PostgresResultSet rs) throws Exception { - int rowCount = 0; - if (rs != null) + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("ResultSetReceiver.sendResultSet").startSpan(); + try (Scope scope = span.makeCurrent()) { - if (isSimpleQuery) + long rowCount = 0; + if (rs != null) { - //Simple query requires to send description - Messages.sendRowDescription(directChannel, rs.getMetaData(), formatCodes); - } - PostgresResultSetMetaData metaData = rs.getMetaData(); - List columnTypes = new ArrayList<>(metaData.getColumnCount()); - for (int i = 0; i < metaData.getColumnCount(); i++) - { - PGType pgType = PGTypes.get(metaData.getColumnType(i + 1), metaData.getScale(i + 1)); - columnTypes.add(pgType); - } - while (rs.next()) - { - rowCount++; - Messages.sendDataRow(directChannel, rs, columnTypes, null); - if (rowCount % 1000 == 0) - { //TODO REMOVE FLASH FROM Messages.sendDataRow - directChannel.flush(); + if (isSimpleQuery) + { + span.addEvent("simpleQuery-sendRowDescription"); + //Simple query requires to send description + Messages.sendRowDescription(directChannel, rs.getMetaData(), formatCodes); } + PostgresResultSetMetaData metaData = rs.getMetaData(); + List columnTypes = new ArrayList<>(metaData.getColumnCount()); + for (int i = 0; i < metaData.getColumnCount(); i++) + { + PGType pgType = PGTypes.get(metaData.getColumnType(i + 1), metaData.getScale(i + 1)); + columnTypes.add(pgType); + } + //TODO add column types to the span + span.addEvent("startSendingData"); + while (rs.next()) + { + rowCount++; + Messages.sendDataRow(directChannel, rs, columnTypes, null); + if (rowCount % 1000 == 0) + { //TODO REMOVE FLASH FROM Messages.sendDataRow + directChannel.flush(); + span.addEvent("sentRows", Attributes.of(AttributeKey.longKey("numberOfRows"), rowCount)); + } + } + span.addEvent("finishedSendingData", Attributes.of(AttributeKey.longKey("numberOfRows"), rowCount)); } } + finally + { + span.end(); + } LOGGER.info("Query complete with row count {}", rowCount); } public void allFinished() { - ChannelFuture sendCommandComplete = Messages.sendCommandComplete(directChannel, query, rowCount); - channel.writePendingMessages(delayedWrites); - channel.flush(); - sendCommandComplete.addListener(future -> completionFuture.complete(null)); + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("ResultSetReceiver.allFinished").startSpan(); + try (Scope scope = span.makeCurrent()) + { + ChannelFuture sendCommandComplete = Messages.sendCommandComplete(directChannel, query, rowCount); + channel.writePendingMessages(delayedWrites); + channel.flush(); + sendCommandComplete.addListener(future -> completionFuture.complete(null)); + } + finally + { + span.end(); + } } public void fail(Throwable throwable) { - ChannelFuture sendErrorResponse = Messages.sendErrorResponse(directChannel, throwable); - channel.writePendingMessages(delayedWrites); - channel.flush(); - sendErrorResponse.addListener(f -> completionFuture.completeExceptionally(throwable)); + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("ResultSetReceiver.fail").startSpan(); + try (Scope scope = span.makeCurrent()) + { + ChannelFuture sendErrorResponse = Messages.sendErrorResponse(directChannel, throwable); + channel.writePendingMessages(delayedWrites); + channel.flush(); + sendErrorResponse.addListener(f -> completionFuture.completeExceptionally(throwable)); + } + finally + { + span.end(); + } } public CompletableFuture completionFuture() diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/Session.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/Session.java index 26934c59c52..3d335ee73be 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/Session.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/Session.java @@ -21,6 +21,10 @@ package org.finos.legend.engine.postgres; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Context; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -35,9 +39,11 @@ import org.finos.legend.engine.postgres.handler.PostgresStatement; import org.finos.legend.engine.postgres.handler.SessionHandler; import org.finos.legend.engine.postgres.utils.ExceptionUtil; +import org.finos.legend.engine.postgres.utils.OpenTelemetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class Session implements AutoCloseable { @@ -52,6 +58,8 @@ public Session(SessionHandler dataSessionHandler, SessionHandler metaDataSession { this.executorService = executorService; this.dispatcher = new ExecutionDispatcher(dataSessionHandler, metaDataSessionHandler); + OpenTelemetry.ACTIVE_SESSIONS.add(1); + OpenTelemetry.TOTAL_SESSIONS.add(1); } public CompletableFuture sync() @@ -83,7 +91,7 @@ public void parse(String statementName, String query, List paramTypes) Prepared p = new Prepared(); p.name = statementName; p.sql = query; - p.paramType = paramTypes.toArray(new Integer[]{}); + p.paramType = paramTypes.toArray(new Integer[] {}); if (query != null) { @@ -177,54 +185,65 @@ public DescribeResult describe(char type, String portalOrStatement) { LOGGER.debug("method=describe type={} portalOrStatement={}", type, portalOrStatement); } - switch (type) + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("ResultSetReceiver.allFinished").startSpan(); + try (Scope scope = span.makeCurrent()) { - case 'P': - Portal portal = getSafePortal(portalOrStatement); - return describe('S', portal.prep.name); - case 'S': - /* - * describe might be called without prior bind call. - * - * If the client uses server-side prepared statements this is usually the case. - * - * E.g. the statement is first prepared: - * - * parse stmtName=S_1 query=insert into t (x) values ($1) paramTypes=[integer] - * describe type=S portalOrStatement=S_1 - * sync - * - * and then used with different bind calls: - * - * bind portalName= statementName=S_1 params=[0] - * describe type=P portalOrStatement= - * execute - * - * bind portalName= statementName=S_1 params=[1] - * describe type=P portalOrStatement= - * execute - */ - - Prepared prepared = parsed.get(portalOrStatement); - try - { - PostgresPreparedStatement preparedStatement = prepared.prep; - if (portalOrStatement == null) + span.setAttribute("type", type); + span.setAttribute("name", portalOrStatement); + switch (type) + { + case 'P': + Portal portal = getSafePortal(portalOrStatement); + return describe('S', portal.prep.name); + case 'S': + /* + * describe might be called without prior bind call. + * + * If the client uses server-side prepared statements this is usually the case. + * + * E.g. the statement is first prepared: + * + * parse stmtName=S_1 query=insert into t (x) values ($1) paramTypes=[integer] + * describe type=S portalOrStatement=S_1 + * sync + * + * and then used with different bind calls: + * + * bind portalName= statementName=S_1 params=[0] + * describe type=P portalOrStatement= + * execute + * + * bind portalName= statementName=S_1 params=[1] + * describe type=P portalOrStatement= + * execute + */ + + Prepared prepared = parsed.get(portalOrStatement); + try { - return new DescribeResult(null, null); + PostgresPreparedStatement preparedStatement = prepared.prep; + if (portalOrStatement == null) + { + return new DescribeResult(null, null); + } + else + { + return new DescribeResult(preparedStatement.getMetaData(), + preparedStatement.getParameterMetaData()); + } } - else + catch (Exception e) { - return new DescribeResult(preparedStatement.getMetaData(), - preparedStatement.getParameterMetaData()); + throw ExceptionUtil.wrapException(e); } - } - catch (Exception e) - { - throw ExceptionUtil.wrapException(e); - } - default: - throw new AssertionError("Unsupported type: " + type); + default: + throw new AssertionError("Unsupported type: " + type); + } + } + finally + { + span.end(); } } @@ -247,6 +266,7 @@ public String getQuery(String portalName) public void close() { clearState(); + OpenTelemetry.ACTIVE_SESSIONS.add(-1); } public void close(char type, String name) @@ -319,13 +339,16 @@ public void close(char type, String name) public CompletableFuture execute(String portalName, int maxRows, ResultSetReceiver resultSetReceiver) { - Portal portal = getSafePortal(portalName); - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Executing query {}/{} ", portalName, portal.prep.sql); - } - try + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("Session.execute").startSpan(); + try (Scope scope = span.makeCurrent()) { + Portal portal = getSafePortal(portalName); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Executing query {}/{} ", portalName, portal.prep.sql); + } + //TODO IDENTIFY THE USE CASE PostgresPreparedStatement preparedStatement = portal.prep.prep; if (preparedStatement == null) @@ -350,22 +373,30 @@ public CompletableFuture execute(String portalName, int maxRows, ResultSetRec } catch (Exception e) { + span.recordException(e); throw ExceptionUtil.wrapException(e); } + finally + { + span.end(); + } } public CompletableFuture executeSimple(String query, ResultSetReceiver resultSetReceiver) { + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Executing simple {} ", query); } - try + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("Session.executeSimple").startSpan(); + try (Scope scope = span.makeCurrent()) { PostgresStatement statement = getSessionHandler(query).createStatement(); - executorService.submit(new StatementExecutionTask(statement, query, resultSetReceiver)); - + span.addEvent("submit StatementExecutionTask"); + Context.taskWrapping(executorService).submit(new StatementExecutionTask(statement, query, resultSetReceiver)); if (activeExecution == null) { activeExecution = resultSetReceiver.completionFuture(); @@ -379,8 +410,13 @@ public CompletableFuture executeSimple(String query, ResultSetReceiver result } catch (Exception e) { + span.recordException(e); throw ExceptionUtil.wrapException(e); } + finally + { + span.end(); + } } @@ -495,9 +531,16 @@ public StatementExecutionTask(PostgresStatement statement, String query, ResultS @Override public Boolean call() throws Exception { - try + OpenTelemetry.TOTAL_EXECUTE.add(1); + OpenTelemetry.ACTIVE_EXECUTE.add(1); + long startTime = System.currentTimeMillis(); + + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("StatementExecutionTask.call").startSpan(); + try (Scope scope = span.makeCurrent()) { boolean results = statement.execute(query); + span.addEvent("receivedResults"); if (!results) { resultSetReceiver.allFinished(); @@ -508,10 +551,19 @@ public Boolean call() throws Exception resultSetReceiver.sendResultSet(rs); resultSetReceiver.allFinished(); } + OpenTelemetry.TOTAL_SUCCESS_EXECUTE.add(1); + OpenTelemetry.EXECUTE_DURATION.record(System.currentTimeMillis() - startTime); } catch (Exception e) { + span.recordException(e); resultSetReceiver.fail(e); + OpenTelemetry.TOTAL_FAILURE_EXECUTE.add(1); + } + finally + { + span.end(); + OpenTelemetry.ACTIVE_EXECUTE.add(-1); } return true; } @@ -531,9 +583,16 @@ public PreparedStatementExecutionTask(PostgresPreparedStatement preparedStatemen @Override public Boolean call() throws Exception { - try + OpenTelemetry.TOTAL_EXECUTE.add(1); + OpenTelemetry.ACTIVE_EXECUTE.add(1); + long startTime = System.currentTimeMillis(); + + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("PreparedStatementExecutionTask.call").startSpan(); + try (Scope scope = span.makeCurrent()) { boolean results = preparedStatement.execute(); + span.addEvent("receivedResults"); if (!results) { resultSetReceiver.allFinished(); @@ -544,10 +603,20 @@ public Boolean call() throws Exception resultSetReceiver.sendResultSet(rs); resultSetReceiver.allFinished(); } + OpenTelemetry.TOTAL_SUCCESS_EXECUTE.add(1); } catch (Exception e) { + span.recordException(e); resultSetReceiver.fail(e); + OpenTelemetry.TOTAL_FAILURE_EXECUTE.add(1); + OpenTelemetry.EXECUTE_DURATION.record(System.currentTimeMillis() - startTime); + + } + finally + { + span.end(); + OpenTelemetry.ACTIVE_EXECUTE.add(-1); } return true; } diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/config/ServerConfig.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/config/ServerConfig.java index 4c96a1311c5..f955e580749 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/config/ServerConfig.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/config/ServerConfig.java @@ -33,6 +33,13 @@ public class ServerConfig private IdentityType identityType; private GSSConfig gss; + public Integer getMetricsPort() + { + return metricsPort; + } + + private Integer metricsPort; + public String getLogConfigFile() { return logConfigFile; diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendExecutionService.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendExecutionService.java index 223eeaca353..8e00bb94b94 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendExecutionService.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendExecutionService.java @@ -18,11 +18,19 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import org.eclipse.collections.api.list.MutableList; import org.eclipse.collections.impl.utility.internal.IterableIterate; +import org.finos.legend.engine.postgres.utils.OpenTelemetry; import org.finos.legend.engine.shared.core.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,13 +50,18 @@ public LegendExecutionService(LegendClient executionClient) public List getSchema(String query) { - try (InputStream inputStream = executionClient.executeSchemaApi(query);) + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("LegendExecutionService.getSchema").startSpan(); + try (Scope scope = span.makeCurrent(); InputStream inputStream = executionClient.executeSchemaApi(query);) { + span.setAttribute("query", query); JsonNode jsonNode = mapper.readTree(inputStream); if (jsonNode.get("columns") != null) { ArrayNode columns = (ArrayNode) jsonNode.get("columns"); - return IterableIterate.collect(columns, c -> new LegendColumn(c.get("name").textValue(), c.get("type").textValue())); + List legendColumns = Collections.unmodifiableList(IterableIterate.collect(columns, c -> new LegendColumn(c.get("name").textValue(), c.get("type").textValue()))); + span.setAttribute(AttributeKey.stringArrayKey("columns"), legendColumns.stream().map(legendColumn -> legendColumn.toString()).collect(Collectors.toList())); + return legendColumns; } return Collections.emptyList(); } @@ -56,14 +69,22 @@ public List getSchema(String query) { throw new LegendTdsClientException("Failed to parse result", e); } + finally + { + span.end(); + } } public LegendExecutionResult executeQuery(String query) { - try + Tracer tracer = OpenTelemetry.getTracer(); + Span span = tracer.spanBuilder("LegendExecutionService.getSchema").startSpan(); + try (Scope scope = span.makeCurrent();) { + span.setAttribute("query", query); InputStream inputStream = executionClient.executeQueryApi(query); + span.addEvent("receivedResponse"); LegendTdsResultParser parser = new LegendTdsResultParser(inputStream); return new LegendExecutionResult() @@ -116,6 +137,10 @@ public List next() { throw new LegendTdsClientException("Error while parsing response", e); } + finally + { + span.end(); + } } diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendStatement.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendStatement.java index 89c54a2ff5f..80f59b44c36 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendStatement.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/handler/legend/LegendStatement.java @@ -18,6 +18,7 @@ import javax.security.auth.Subject; import org.finos.legend.engine.postgres.handler.PostgresResultSet; import org.finos.legend.engine.postgres.handler.PostgresStatement; +import org.finos.legend.engine.postgres.utils.PrometheusCollector; import org.finos.legend.engine.shared.core.identity.Identity; import org.finos.legend.engine.shared.core.identity.credential.LegendKerberosCredential; diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/OpenTelemetry.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/OpenTelemetry.java new file mode 100644 index 00000000000..2c6079ade22 --- /dev/null +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/OpenTelemetry.java @@ -0,0 +1,108 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package org.finos.legend.engine.postgres.utils; + +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Tracer; + +public class OpenTelemetry +{ + private static final String LEGEND_ENGINE_XTS_SQL = "legend-engine-xts-sql"; + + public static final LongUpDownCounter ACTIVE_SESSIONS = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .upDownCounterBuilder("active_sessions") + .setDescription("Number of active sessions") + .build(); + + public static final LongCounter TOTAL_SESSIONS = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_sessions") + .setDescription("Total of sessions").build(); + + + public static final LongUpDownCounter ACTIVE_EXECUTE = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .upDownCounterBuilder("active_execute_request") + .setDescription("Number of active execute requests") + .build(); + + + public static final LongCounter TOTAL_EXECUTE = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_execute_requests") + .setDescription("Total of execute requests").build(); + + + public static final LongCounter TOTAL_SUCCESS_EXECUTE = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_success_execute_requests") + .setDescription("Total of success execute requests").build(); + + + public static final LongCounter TOTAL_FAILURE_EXECUTE = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_failure_execute_requests") + .setDescription("Total of failure execute requests").build(); + + + public static final DoubleHistogram EXECUTE_DURATION = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .histogramBuilder("execute_requests_duration") + .setDescription("Total of success execute requests") + .build(); + + + public static final LongUpDownCounter ACTIVE_METADATA = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .upDownCounterBuilder("active_metadata_requests") + .setDescription("Number of active metadata requests") + .build(); + + + public static final LongCounter TOTAL_METADATA = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_metadata_requests") + .setDescription("Total of metadata requests").build(); + + + public static final LongCounter TOTAL_SUCCESS_METADATA = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_success_metadata_requests") + .setDescription("Total of success metadata requests") + .build(); + + public static final LongCounter TOTAL_FAILURE_METADATA = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .counterBuilder("total_failure_metadata_requests") + .setDescription("Total of failure metadata requests") + .build(); + + public static final DoubleHistogram METADATA_DURATION = GlobalOpenTelemetry.get() + .getMeter(LEGEND_ENGINE_XTS_SQL) + .histogramBuilder("metadata_requests_duration") + .setDescription(("Execute duration")) + .build(); + + public static Tracer getTracer() + { + return GlobalOpenTelemetry.getTracer(LEGEND_ENGINE_XTS_SQL); + } + +} diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/PrometheusCollector.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/PrometheusCollector.java new file mode 100644 index 00000000000..c6aaf134eb9 --- /dev/null +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/main/java/org/finos/legend/engine/postgres/utils/PrometheusCollector.java @@ -0,0 +1,84 @@ +// Copyright 2023 Goldman Sachs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package org.finos.legend.engine.postgres.utils; + +/*import io.prometheus.metrics.core.metrics.Counter; +import io.prometheus.metrics.core.metrics.Gauge; +import io.prometheus.metrics.core.metrics.Histogram; +import io.prometheus.metrics.model.snapshots.Unit;*/ + +public class PrometheusCollector +{ +/* + + public static final Gauge ACTIVE_SESSIONS = Gauge.builder() + .name("current_active_sessions") + .help("Number of active sessions") + .register(); + public static final Counter TOTAL_SESSIONS = Counter.builder() + .name("total_active_sessions") + .help("Total of active sessions") + .register(); + public static final Gauge ACTIVE_EXECUTE = Gauge.builder() + .name("active_sessions") + .help("Number of active execute requests") + .register(); + + public static final Counter TOTAL_EXECUTE = Counter.builder() + .name("total_execute_requests") + .help("Total of execute requests") + .register(); + public static final Counter TOTAL_SUCCESS_EXECUTE = Counter.builder() + .name("total_success_execute_requests") + .help("Total of success execute requests") + .register(); + public static final Counter TOTAL_FAILURE_EXECUTE = Counter.builder() + .name("total_failure_execute_requests") + .help("Total of failure execute requests") + .register(); + + public static final Histogram EXECUTE_DURATION = Histogram.builder() + .name("execute_requests_duration") + .help("Execute duration") + .unit(Unit.SECONDS) + .register(); + + public static final Gauge ACTIVE_METADATA = Gauge.builder() + .name("active_metadata_requests") + .help("Number of active metadata requests") + .register(); + public static final Counter TOTAL_METADATA = Counter.builder() + .name("total_metadata_requests") + .help("Total of metadata requests") + .register(); + public static final Counter TOTAL_SUCCESS_METADATA = Counter.builder() + .name("total_success_metadata_requests") + .help("Total of success metadata requests") + .register(); + public static final Counter TOTAL_FAILURE_METADATA = Counter.builder() + .name("total_failure_metadata_requests") + .help("Total of failure metadata requests") + .register(); + + public static final Histogram METADATA_DURATION = Histogram.builder() + .name("metadata_requests_duration") + .help("Execute duration") + .unit(Unit.SECONDS) + .register(); +*/ + + +} diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/java/org/finos/legend/engine/postgres/PostgresServerSimpleTestClient.java b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/java/org/finos/legend/engine/postgres/PostgresServerSimpleTestClient.java index 8a1eb9698c7..ab794dada80 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/java/org/finos/legend/engine/postgres/PostgresServerSimpleTestClient.java +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/java/org/finos/legend/engine/postgres/PostgresServerSimpleTestClient.java @@ -29,26 +29,32 @@ public class PostgresServerSimpleTestClient public static void main(String[] args) throws Exception { - try ( - Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9998/postgres", - "dummy", "dummy"); - PreparedStatement statement = connection.prepareStatement("SELECT * FROM service.\"/personService\""); - ResultSet resultSet = statement.executeQuery() - ) + for (int j = 0; j < 10; j++) { - - int columnCount = resultSet.getMetaData().getColumnCount(); - while (resultSet.next()) + try ( + Connection connection = DriverManager.getConnection("jdbc:postgresql://127.0.0.1:9998/postgres", + "dummy", "dummy"); + PreparedStatement statement = connection.prepareStatement("SELECT * FROM service.\"/personService\""); + ResultSet resultSet = statement.executeQuery() + ) { - for (int i = 1; i < +columnCount; i++) + + + int columnCount = resultSet.getMetaData().getColumnCount(); + while (resultSet.next()) { - System.out.println(resultSet.getMetaData().getColumnName(i) + " : " + resultSet.getObject(i)); + for (int i = 1; i < +columnCount; i++) + { + System.out.println(resultSet.getMetaData().getColumnName(i) + " : " + resultSet.getObject(i)); + } + System.out.println("\n"); } - System.out.println("\n"); + } } + } diff --git a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/resources/staticConfig.json b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/resources/staticConfig.json index 4f73fedc3ae..d9650e8da1d 100644 --- a/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/resources/staticConfig.json +++ b/legend-engine-xts-sql/legend-engine-xt-sql-postgres-server/src/test/resources/staticConfig.json @@ -1,12 +1,13 @@ { "port": 9998, + "metricsPort" : 8802, "authenticationMethod": "NO_PASSWORD", "identityType": "ANONYMOUS", "handler": { "className": "org.finos.legend.engine.postgres.config.StaticHandlerConfig", "result": "legend-engine-xt-sql-postgres-server/src/test/resources/legendTdsResult.json", "schema": "legend-engine-xt-sql-postgres-server/src/test/resources/legendTdsSchema.json", - "delay": 6000, + "delay": 1000, "type": "STATIC" }, "logConfigFile": "legend-engine-xt-sql-postgres-server/src/test/resources/logback-test.xml" From 4fa7abc6cd3a85cafc3e6c3f89601d18af28c482 Mon Sep 17 00:00:00 2001 From: Leonid Shtivelman Date: Mon, 26 Feb 2024 17:19:18 -0500 Subject: [PATCH 02/27] update to use open telemetry --- ...gresServerLauncher_With_OpenTracing_Agen.xml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml diff --git a/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml b/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml new file mode 100644 index 00000000000..ed741a3d895 --- /dev/null +++ b/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml @@ -0,0 +1,17 @@ + + + + \ No newline at end of file From 78b57f357da11ef2371f212a0e9c8b0342777243 Mon Sep 17 00:00:00 2001 From: Leonid Shtivelman Date: Mon, 4 Mar 2024 14:20:35 -0500 Subject: [PATCH 03/27] update to use open telemetry --- .../TestPostgresServerLauncher_With_OpenTracing_Agen.xml | 2 +- .../finos/legend/engine/postgres/PostgresWireProtocol.java | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml b/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml index ed741a3d895..c6ec1c1071b 100644 --- a/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml +++ b/legend-engine-xts-sql/.idea/runConfigurations/TestPostgresServerLauncher_With_OpenTracing_Agen.xml @@ -3,7 +3,7 @@