From 2e627204173afd516ea35b85e267192311b2ffee Mon Sep 17 00:00:00 2001 From: Zhang Lizhi Date: Wed, 8 Jan 2025 15:13:52 +0800 Subject: [PATCH] Persistence Component: Fix staging filters saved as String + Fix new stats for Bulk Load coming as String (#3321) --- .../persistence/components/util/SqlUtils.java | 27 ++++++++++++++----- .../bigquery/executor/BigQueryExecutor.java | 18 +++++++------ .../executor/RelationalExecutor.java | 10 ++++--- .../relational/snowflake/SnowflakeSink.java | 12 ++++----- 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SqlUtils.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SqlUtils.java index 10425ada738..303216b6883 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SqlUtils.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-physical-plan/src/main/java/org/finos/legend/engine/persistence/components/util/SqlUtils.java @@ -21,36 +21,49 @@ public class SqlUtils { - - public static String getEnrichedSql(Map placeholderKeyValues, String sql) + public static String getEnrichedSql(Map placeholderKeyValues, String sql, String batchIdPattern) { String enrichedSql = sql; for (Map.Entry entry : placeholderKeyValues.entrySet()) { - enrichedSql = enrichedSql.replaceAll(Pattern.quote(entry.getKey()), entry.getValue().value()); + enrichedSql = replacePlaceholderWithActualValue(enrichedSql, entry.getKey(), entry.getValue().value(), batchIdPattern); } return enrichedSql; } - private static String getEnrichedSqlWithMasking(Map placeholderKeyValues, String sql) + private static String getEnrichedSqlWithMasking(Map placeholderKeyValues, String sql, String batchIdPattern) { String enrichedSql = sql; for (Map.Entry entry : placeholderKeyValues.entrySet()) { if (!entry.getValue().isSensitive()) { - enrichedSql = enrichedSql.replaceAll(Pattern.quote(entry.getKey()), entry.getValue().value()); + enrichedSql = replacePlaceholderWithActualValue(enrichedSql, entry.getKey(), entry.getValue().value(), batchIdPattern); } } return enrichedSql; } - public static void logSql(Logger logger, SqlLogging sqlLogging, String sqlBeforeReplacingPlaceholders, String sqlAfterReplacingPlaceholders, Map placeholderKeyValues) + private static String replacePlaceholderWithActualValue(String enrichedSql, String placeholder, String actualValue, String batchIdPattern) + { + if (placeholder.equals(batchIdPattern)) + { + // These are to address the issue of batch id patterns being quoted in multi-dataset flow + String singleQuotedPattern = String.format("'%s'", placeholder); + enrichedSql = enrichedSql.replaceAll(Pattern.quote(singleQuotedPattern), actualValue); + + String doubleQuotedPattern = String.format("\"%s\"", placeholder); + enrichedSql = enrichedSql.replaceAll(Pattern.quote(doubleQuotedPattern), actualValue); + } + return enrichedSql.replaceAll(Pattern.quote(placeholder), actualValue); + } + + public static void logSql(Logger logger, SqlLogging sqlLogging, String sqlBeforeReplacingPlaceholders, String sqlAfterReplacingPlaceholders, Map placeholderKeyValues, String batchIdPattern) { switch (sqlLogging) { case MASKED: - String maskedSql = getEnrichedSqlWithMasking(placeholderKeyValues, sqlBeforeReplacingPlaceholders); + String maskedSql = getEnrichedSqlWithMasking(placeholderKeyValues, sqlBeforeReplacingPlaceholders, batchIdPattern); logger.info(maskedSql); break; case UNMASKED: diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java index 7eaa2ae8b13..2c5b7de4f43 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java @@ -34,6 +34,8 @@ import java.util.List; import java.util.Map; +import static org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils.BATCH_ID_PATTERN; + public class BigQueryExecutor implements Executor { private final BigQuerySink bigQuerySink; @@ -64,8 +66,8 @@ public void executePhysicalPlan(SqlPlan physicalPlan, Map executeLoadPhysicalPlanAndGetStats(SqlPlan physicalPlan, Map placeholderKeyValues) { - String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, physicalPlan.getSqlList().get(0)); - SqlUtils.logSql(LOGGER, sqlLogging, physicalPlan.getSqlList().get(0), enrichedSql, placeholderKeyValues); + String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, physicalPlan.getSqlList().get(0), BATCH_ID_PATTERN); + SqlUtils.logSql(LOGGER, sqlLogging, physicalPlan.getSqlList().get(0), enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN); return bigQueryHelper.executeLoadStatement(enrichedSql); } @@ -105,8 +107,8 @@ public List executePhysicalPlanAndGetResults(SqlPlan physicalPlan, List resultSetList = new ArrayList<>(); for (String sql : physicalPlan.getSqlList()) { - String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql); - SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues); + String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN); + SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN); List> queryResult = bigQueryHelper.executeQuery(enrichedSql); if (!queryResult.isEmpty()) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java index 311a9a91af3..593266fcb83 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-core/src/main/java/org/finos/legend/engine/persistence/components/relational/executor/RelationalExecutor.java @@ -32,6 +32,8 @@ import java.util.List; import java.util.Map; +import static org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils.BATCH_ID_PATTERN; + public class RelationalExecutor implements Executor { private final RelationalSink relationalSink; @@ -60,8 +62,8 @@ public void executePhysicalPlan(SqlPlan physicalPlan, Map sqlList = physicalPlan.getSqlList(); for (String sql : sqlList) { - String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql); - SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues); + String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN); + SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN); relationalExecutionHelper.executeStatement(enrichedSql); } } @@ -104,8 +106,8 @@ public List executePhysicalPlanAndGetResults(SqlPlan physicalPlan, List resultSetList = new ArrayList<>(); for (String sql : physicalPlan.getSqlList()) { - String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql); - SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues); + String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN); + SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN); TabularData queryResultData = relationalExecutionHelper.executeQueryAndGetResultsAsTabularData(enrichedSql); if (!queryResultData.data().isEmpty()) { diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java index f007e918908..bd65042aba1 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-snowflake/src/main/java/org/finos/legend/engine/persistence/components/relational/snowflake/SnowflakeSink.java @@ -620,24 +620,24 @@ private void appendLoadQueryStats(Executor executo { case QueryStatsLogicalPlanUtils.EXTERNAL_SCAN_STAGE: Object externalScan = queryStats.get(QueryStatsLogicalPlanUtils.EXTERNAL_BYTES_SCANNED_ALIAS); - if (externalScan != null) + if (externalScan != null && !String.valueOf(externalScan).isEmpty()) { - stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, externalScan); + stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, Long.parseLong(String.valueOf(externalScan))); } else { - stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, 0); + stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, 0L); } break; case QueryStatsLogicalPlanUtils.INSERT_STAGE: Object insert = queryStats.get(QueryStatsLogicalPlanUtils.INPUT_ROWS_ALIAS); - if (insert != null) + if (insert != null && !String.valueOf(insert).isEmpty()) { - stats.put(StatisticName.INCOMING_RECORD_COUNT, insert); + stats.put(StatisticName.INCOMING_RECORD_COUNT, Long.parseLong(String.valueOf(insert))); } else { - stats.put(StatisticName.INCOMING_RECORD_COUNT, 0); + stats.put(StatisticName.INCOMING_RECORD_COUNT, 0L); } break; }