Skip to content

Commit

Permalink
Persistence Component: Fix staging filters saved as String + Fix new …
Browse files Browse the repository at this point in the history
…stats for Bulk Load coming as String (finos#3321)
  • Loading branch information
kumuwu authored Jan 8, 2025
1 parent d3accad commit 2e62720
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,49 @@

public class SqlUtils
{

public static String getEnrichedSql(Map<String, PlaceholderValue> placeholderKeyValues, String sql)
public static String getEnrichedSql(Map<String, PlaceholderValue> placeholderKeyValues, String sql, String batchIdPattern)
{
String enrichedSql = sql;
for (Map.Entry<String, PlaceholderValue> entry : placeholderKeyValues.entrySet())
{
enrichedSql = enrichedSql.replaceAll(Pattern.quote(entry.getKey()), entry.getValue().value());
enrichedSql = replacePlaceholderWithActualValue(enrichedSql, entry.getKey(), entry.getValue().value(), batchIdPattern);
}
return enrichedSql;
}

private static String getEnrichedSqlWithMasking(Map<String, PlaceholderValue> placeholderKeyValues, String sql)
private static String getEnrichedSqlWithMasking(Map<String, PlaceholderValue> placeholderKeyValues, String sql, String batchIdPattern)
{
String enrichedSql = sql;
for (Map.Entry<String, PlaceholderValue> entry : placeholderKeyValues.entrySet())
{
if (!entry.getValue().isSensitive())
{
enrichedSql = enrichedSql.replaceAll(Pattern.quote(entry.getKey()), entry.getValue().value());
enrichedSql = replacePlaceholderWithActualValue(enrichedSql, entry.getKey(), entry.getValue().value(), batchIdPattern);
}
}
return enrichedSql;
}

public static void logSql(Logger logger, SqlLogging sqlLogging, String sqlBeforeReplacingPlaceholders, String sqlAfterReplacingPlaceholders, Map<String, PlaceholderValue> placeholderKeyValues)
private static String replacePlaceholderWithActualValue(String enrichedSql, String placeholder, String actualValue, String batchIdPattern)
{
if (placeholder.equals(batchIdPattern))
{
// These are to address the issue of batch id patterns being quoted in multi-dataset flow
String singleQuotedPattern = String.format("'%s'", placeholder);
enrichedSql = enrichedSql.replaceAll(Pattern.quote(singleQuotedPattern), actualValue);

String doubleQuotedPattern = String.format("\"%s\"", placeholder);
enrichedSql = enrichedSql.replaceAll(Pattern.quote(doubleQuotedPattern), actualValue);
}
return enrichedSql.replaceAll(Pattern.quote(placeholder), actualValue);
}

public static void logSql(Logger logger, SqlLogging sqlLogging, String sqlBeforeReplacingPlaceholders, String sqlAfterReplacingPlaceholders, Map<String, PlaceholderValue> placeholderKeyValues, String batchIdPattern)
{
switch (sqlLogging)
{
case MASKED:
String maskedSql = getEnrichedSqlWithMasking(placeholderKeyValues, sqlBeforeReplacingPlaceholders);
String maskedSql = getEnrichedSqlWithMasking(placeholderKeyValues, sqlBeforeReplacingPlaceholders, batchIdPattern);
logger.info(maskedSql);
break;
case UNMASKED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.List;
import java.util.Map;

import static org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils.BATCH_ID_PATTERN;

public class BigQueryExecutor implements Executor<SqlGen, TabularData, SqlPlan>
{
private final BigQuerySink bigQuerySink;
Expand Down Expand Up @@ -64,26 +66,26 @@ public void executePhysicalPlan(SqlPlan physicalPlan, Map<String, PlaceholderVal
{
for (String sql : sqlList)
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
bigQueryHelper.executeQuery(enrichedSql);
}
}
else
{
for (String sql : sqlList)
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
bigQueryHelper.executeStatement(enrichedSql);
}
}
}

public Map<StatisticName, Object> executeLoadPhysicalPlanAndGetStats(SqlPlan physicalPlan, Map<String, PlaceholderValue> placeholderKeyValues)
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, physicalPlan.getSqlList().get(0));
SqlUtils.logSql(LOGGER, sqlLogging, physicalPlan.getSqlList().get(0), enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, physicalPlan.getSqlList().get(0), BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, physicalPlan.getSqlList().get(0), enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
return bigQueryHelper.executeLoadStatement(enrichedSql);
}

Expand All @@ -105,8 +107,8 @@ public List<TabularData> executePhysicalPlanAndGetResults(SqlPlan physicalPlan,
List<TabularData> resultSetList = new ArrayList<>();
for (String sql : physicalPlan.getSqlList())
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
List<Map<String, Object>> queryResult = bigQueryHelper.executeQuery(enrichedSql);
if (!queryResult.isEmpty())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.List;
import java.util.Map;

import static org.finos.legend.engine.persistence.components.relational.api.utils.IngestionUtils.BATCH_ID_PATTERN;

public class RelationalExecutor implements Executor<SqlGen, TabularData, SqlPlan>
{
private final RelationalSink relationalSink;
Expand Down Expand Up @@ -60,8 +62,8 @@ public void executePhysicalPlan(SqlPlan physicalPlan, Map<String, PlaceholderVal
List<String> sqlList = physicalPlan.getSqlList();
for (String sql : sqlList)
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
relationalExecutionHelper.executeStatement(enrichedSql);
}
}
Expand Down Expand Up @@ -104,8 +106,8 @@ public List<TabularData> executePhysicalPlanAndGetResults(SqlPlan physicalPlan,
List<TabularData> resultSetList = new ArrayList<>();
for (String sql : physicalPlan.getSqlList())
{
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues);
String enrichedSql = SqlUtils.getEnrichedSql(placeholderKeyValues, sql, BATCH_ID_PATTERN);
SqlUtils.logSql(LOGGER, sqlLogging, sql, enrichedSql, placeholderKeyValues, BATCH_ID_PATTERN);
TabularData queryResultData = relationalExecutionHelper.executeQueryAndGetResultsAsTabularData(enrichedSql);
if (!queryResultData.data().isEmpty())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,24 +620,24 @@ private void appendLoadQueryStats(Executor<SqlGen, TabularData, SqlPlan> executo
{
case QueryStatsLogicalPlanUtils.EXTERNAL_SCAN_STAGE:
Object externalScan = queryStats.get(QueryStatsLogicalPlanUtils.EXTERNAL_BYTES_SCANNED_ALIAS);
if (externalScan != null)
if (externalScan != null && !String.valueOf(externalScan).isEmpty())
{
stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, externalScan);
stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, Long.parseLong(String.valueOf(externalScan)));
}
else
{
stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, 0);
stats.put(StatisticName.INPUT_FILES_BYTES_SCANNED, 0L);
}
break;
case QueryStatsLogicalPlanUtils.INSERT_STAGE:
Object insert = queryStats.get(QueryStatsLogicalPlanUtils.INPUT_ROWS_ALIAS);
if (insert != null)
if (insert != null && !String.valueOf(insert).isEmpty())
{
stats.put(StatisticName.INCOMING_RECORD_COUNT, insert);
stats.put(StatisticName.INCOMING_RECORD_COUNT, Long.parseLong(String.valueOf(insert)));
}
else
{
stats.put(StatisticName.INCOMING_RECORD_COUNT, 0);
stats.put(StatisticName.INCOMING_RECORD_COUNT, 0L);
}
break;
}
Expand Down

0 comments on commit 2e62720

Please sign in to comment.