diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java index 3d5f556970a..75cf32a3a55 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-logical-plan/src/main/java/org/finos/legend/engine/persistence/components/common/FileFormat.java @@ -16,20 +16,8 @@ public enum FileFormat { - CSV("CSV"), - JSON("JSON"), - AVRO("AVRO"), - PARQUET("PARQUET"); - - String name; - - FileFormat(String name) - { - this.name = name; - } - - public String getName() - { - return this.name; - } + CSV, + JSON, + AVRO, + PARQUET; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java index d2da804ec33..046d2088a44 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryExecutor.java @@ -75,15 +75,12 @@ public Map executeLoadPhysicalPlanAndGetStats(SqlPlan phy { List sqlList = physicalPlan.getSqlList(); - // The first SQL is a load statement - // Executed in a new transaction + // Load statement (Not supported in Bigquery to run in a transaction) Map loadStats = bigQueryHelper.executeLoadStatement(getEnrichedSql(placeholderKeyValues, sqlList.get(0))); - // The second SQL is an insert statement - // We need to first close the current transaction (if it exists) and open a new transaction - // Such that the result of the Load will be available to the Insert - bigQueryHelper.close(); - bigQueryHelper.executeStatement(getEnrichedSql(placeholderKeyValues, sqlList.get(1))); + // Isolation level of Bigquery is Snapshot, + // So Insert statement has to run in a new transaction so that it can see the changes of Load + bigQueryHelper.executeStatementInANewTransaction(getEnrichedSql(placeholderKeyValues, sqlList.get(1))); return loadStats; } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java index 1336c32eda2..261226baa04 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/executor/BigQueryHelper.java @@ -327,6 +327,12 @@ public void executeStatement(String sql) executeStatements(sqls); } + public void executeStatementInANewTransaction(String sql) + { + List sqls = Collections.singletonList(sql); + executeStatementsInANewTransaction(sqls); + } + // Execute statements in a transaction - either use an existing one or use a new one public void executeStatements(List sqls) { @@ -346,45 +352,50 @@ public void executeStatements(List sqls) } else { - BigQueryTransactionManager txManager = null; - try + executeStatementsInANewTransaction(sqls); + } + } + + public void executeStatementsInANewTransaction(List sqls) + { + BigQueryTransactionManager txManager = null; + try + { + txManager = new BigQueryTransactionManager(bigQuery); + txManager.beginTransaction(); + for (String sql : sqls) { - txManager = new BigQueryTransactionManager(bigQuery); - txManager.beginTransaction(); - for (String sql : sqls) - { - txManager.executeInCurrentTransaction(sql); - } - txManager.commitTransaction(); + txManager.executeInCurrentTransaction(sql); } - catch (Exception e) + txManager.commitTransaction(); + } + catch (Exception e) + { + LOGGER.error("Error executing SQL statements: " + sqls, e); + if (txManager != null) { - LOGGER.error("Error executing SQL statements: " + sqls, e); - if (txManager != null) + try { - try - { - txManager.revertTransaction(); - } - catch (InterruptedException e2) - { - throw new RuntimeException(e2); - } + txManager.revertTransaction(); + } + catch (InterruptedException e2) + { + throw new RuntimeException(e2); } - throw new RuntimeException(e); } - finally + throw new RuntimeException(e); + } + finally + { + if (txManager != null) { - if (txManager != null) + try { - try - { - txManager.close(); - } - catch (InterruptedException e) - { - LOGGER.error("Error closing transaction manager.", e); - } + txManager.close(); + } + catch (InterruptedException e) + { + LOGGER.error("Error closing transaction manager.", e); } } } diff --git a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java index 82f7045a630..0c5e7d91bc5 100644 --- a/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java +++ b/legend-engine-xts-persistence/legend-engine-xt-persistence-component/legend-engine-xt-persistence-component-relational-bigquery/src/main/java/org/finos/legend/engine/persistence/components/relational/bigquery/sql/visitor/StagedFilesDatasetReferenceVisitor.java @@ -40,7 +40,7 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu Map loadOptionsMap = new HashMap<>(); FileFormat fileFormat = datasetProperties.fileFormat(); - loadOptionsMap.put("format", fileFormat.getName()); + loadOptionsMap.put("format", fileFormat.name()); datasetProperties.loadOptions().ifPresent(options -> retrieveLoadOptions(fileFormat, options, loadOptionsMap)); StagedFilesTable stagedFilesTable = new StagedFilesTable(datasetProperties.files(), loadOptionsMap);