Skip to content

Commit

Permalink
Allow select statement in before_load and after_load
Browse files Browse the repository at this point in the history
Add execute method.
  • Loading branch information
hiroyuki-sato committed Jul 23, 2023
1 parent dfb2192 commit 80e6eb2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ public JdbcSchema get()
// direct modify mode doesn't need intermediate tables.
task.setIntermediateTables(Optional.<List<TableIdentifier>>empty());
if (task.getBeforeLoad().isPresent()) {
con.executeSql(task.getBeforeLoad().get());
con.executeSqlStatement(task.getBeforeLoad().get());
}
}

Expand Down Expand Up @@ -874,7 +874,7 @@ protected void doCommit(JdbcOutputConnection con, PluginTask task, int taskCount
case MERGE_DIRECT:
// already loaded
if (task.getAfterLoad().isPresent()) {
con.executeSql(task.getAfterLoad().get());
con.executeSqlStatement(task.getAfterLoad().get());
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,19 @@ protected void executeSql(String sql) throws SQLException
}
}

protected void executeSqlStatement(String sql) throws SQLException
{
Statement stmt = connection.createStatement();
try {
execute(stmt, sql);
commitIfNecessary(connection);
} catch (SQLException ex) {
throw safeRollback(connection, ex);
} finally {
stmt.close();
}
}

protected void collectInsert(List<TableIdentifier> fromTables, JdbcSchema schema, TableIdentifier toTable,
boolean truncateDestinationFirst, Optional<String> preSql, Optional<String> postSql) throws SQLException
{
Expand All @@ -398,14 +411,14 @@ protected void collectInsert(List<TableIdentifier> fromTables, JdbcSchema schema
}

if (preSql.isPresent()) {
executeUpdate(stmt, preSql.get());
execute(stmt, preSql.get());
}

String sql = buildCollectInsertSql(fromTables, schema, toTable);
executeUpdate(stmt, sql);

if (postSql.isPresent()) {
executeUpdate(stmt, postSql.get());
execute(stmt, postSql.get());
}

commitIfNecessary(connection);
Expand Down Expand Up @@ -462,14 +475,14 @@ protected void collectMerge(List<TableIdentifier> fromTables, JdbcSchema schema,
Statement stmt = connection.createStatement();
try {
if (preSql.isPresent()) {
executeUpdate(stmt, preSql.get());
execute(stmt, preSql.get());
}

String sql = buildCollectMergeSql(fromTables, schema, toTable, mergeConfig);
executeUpdate(stmt, sql);

if (postSql.isPresent()) {
executeUpdate(stmt, postSql.get());
execute(stmt, postSql.get());
}

commitIfNecessary(connection);
Expand All @@ -494,7 +507,7 @@ public void replaceTable(TableIdentifier fromTable, JdbcSchema schema, TableIden
executeUpdate(stmt, buildRenameTableSql(fromTable, toTable));

if (postSql.isPresent()) {
executeUpdate(stmt, postSql.get());
execute(stmt, postSql.get());
}

commitIfNecessary(connection);
Expand Down Expand Up @@ -630,6 +643,20 @@ protected int executeUpdate(Statement stmt, String sql) throws SQLException
return count;
}

protected boolean execute(Statement stmt, String sql) throws SQLException
{
logger.info("SQL: " + sql);
long startTime = System.currentTimeMillis();
boolean result = stmt.execute(sql);
double seconds = (System.currentTimeMillis() - startTime) / 1000.0;
if (result) {
logger.info(String.format("> succeed %.2f seconds", seconds));
} else {
logger.info(String.format("> failed %.2f seconds", seconds));
}
return result;
}

protected void commitIfNecessary(Connection con) throws SQLException
{
if (!con.getAutoCommit()) {
Expand Down

0 comments on commit 80e6eb2

Please sign in to comment.