Skip to content

Commit

Permalink
Address Code Review Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
prasar-ashutosh committed Oct 19, 2023
1 parent 9ecee88 commit f611912
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,8 @@

public enum FileFormat
{
CSV("CSV"),
JSON("JSON"),
AVRO("AVRO"),
PARQUET("PARQUET");

String name;

FileFormat(String name)
{
this.name = name;
}

public String getName()
{
return this.name;
}
CSV,
JSON,
AVRO,
PARQUET;
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,12 @@ public Map<StatisticName, Object> executeLoadPhysicalPlanAndGetStats(SqlPlan phy
{
List<String> sqlList = physicalPlan.getSqlList();

// The first SQL is a load statement
// Executed in a new transaction
// Load statement (Not supported in Bigquery to run in a transaction)
Map<StatisticName, Object> loadStats = bigQueryHelper.executeLoadStatement(getEnrichedSql(placeholderKeyValues, sqlList.get(0)));

// The second SQL is an insert statement
// We need to first close the current transaction (if it exists) and open a new transaction
// Such that the result of the Load will be available to the Insert
bigQueryHelper.close();
bigQueryHelper.executeStatement(getEnrichedSql(placeholderKeyValues, sqlList.get(1)));
// Isolation level of Bigquery is Snapshot,
// So Insert statement has to run in a new transaction so that it can see the changes of Load
bigQueryHelper.executeStatementInANewTransaction(getEnrichedSql(placeholderKeyValues, sqlList.get(1)));
return loadStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ public void executeStatement(String sql)
executeStatements(sqls);
}

public void executeStatementInANewTransaction(String sql)
{
List<String> sqls = Collections.singletonList(sql);
executeStatementsInANewTransaction(sqls);
}

// Execute statements in a transaction - either use an existing one or use a new one
public void executeStatements(List<String> sqls)
{
Expand All @@ -346,45 +352,50 @@ public void executeStatements(List<String> sqls)
}
else
{
BigQueryTransactionManager txManager = null;
try
executeStatementsInANewTransaction(sqls);
}
}

public void executeStatementsInANewTransaction(List<String> sqls)
{
BigQueryTransactionManager txManager = null;
try
{
txManager = new BigQueryTransactionManager(bigQuery);
txManager.beginTransaction();
for (String sql : sqls)
{
txManager = new BigQueryTransactionManager(bigQuery);
txManager.beginTransaction();
for (String sql : sqls)
{
txManager.executeInCurrentTransaction(sql);
}
txManager.commitTransaction();
txManager.executeInCurrentTransaction(sql);
}
catch (Exception e)
txManager.commitTransaction();
}
catch (Exception e)
{
LOGGER.error("Error executing SQL statements: " + sqls, e);
if (txManager != null)
{
LOGGER.error("Error executing SQL statements: " + sqls, e);
if (txManager != null)
try
{
try
{
txManager.revertTransaction();
}
catch (InterruptedException e2)
{
throw new RuntimeException(e2);
}
txManager.revertTransaction();
}
catch (InterruptedException e2)
{
throw new RuntimeException(e2);
}
throw new RuntimeException(e);
}
finally
throw new RuntimeException(e);
}
finally
{
if (txManager != null)
{
if (txManager != null)
try
{
try
{
txManager.close();
}
catch (InterruptedException e)
{
LOGGER.error("Error closing transaction manager.", e);
}
txManager.close();
}
catch (InterruptedException e)
{
LOGGER.error("Error closing transaction manager.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public VisitorResult visit(PhysicalPlanNode prev, StagedFilesDatasetReference cu

Map<String, Object> loadOptionsMap = new HashMap<>();
FileFormat fileFormat = datasetProperties.fileFormat();
loadOptionsMap.put("format", fileFormat.getName());
loadOptionsMap.put("format", fileFormat.name());
datasetProperties.loadOptions().ifPresent(options -> retrieveLoadOptions(fileFormat, options, loadOptionsMap));

StagedFilesTable stagedFilesTable = new StagedFilesTable(datasetProperties.files(), loadOptionsMap);
Expand Down

0 comments on commit f611912

Please sign in to comment.