Skip to content

Commit

Permalink
Refactor DriverExecutorFacade (#31598)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Jun 5, 2024
1 parent feb2de8 commit f276c61
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,33 @@

package org.apache.shardingsphere.driver.executor;

import lombok.Getter;
import org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteCallback;
import org.apache.shardingsphere.driver.executor.callback.execute.StatementExecuteUpdateCallback;
import org.apache.shardingsphere.driver.executor.callback.keygen.GeneratedKeyCallback;
import org.apache.shardingsphere.driver.executor.callback.replay.PreparedStatementParametersReplayCallback;
import org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

/**
* Driver executor facade.
Expand All @@ -38,16 +54,12 @@ public final class DriverExecutorFacade implements AutoCloseable {

private final SQLFederationEngine sqlFederationEngine;

@Getter
private final DriverExecuteQueryExecutor queryExecutor;

@Getter
private final DriverExecuteUpdateExecutor updateExecutor;

@Getter
private final DriverExecuteExecutor executeExecutor;

@Getter
private final DriverExecuteBatchExecutor executeBatchExecutor;

public DriverExecutorFacade(final ShardingSphereConnection connection) {
Expand All @@ -68,6 +80,103 @@ public DriverExecutorFacade(final ShardingSphereConnection connection, final Sha
executeBatchExecutor = null == database ? null : new DriverExecuteBatchExecutor(connection, metaData, database, jdbcExecutor);
}

/**
* Execute query.
*
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
* @param statement statement
* @param columnLabelAndIndexMap column label and index map
* @param addCallback statement add callback
* @param replayCallback statement replay callback
* @return result set
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public ResultSet executeQuery(final ShardingSphereDatabase database, final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final Statement statement, final Map<String, Integer> columnLabelAndIndexMap,
final StatementAddCallback addCallback, final StatementReplayCallback replayCallback) throws SQLException {
return queryExecutor.executeQuery(database, queryContext, prepareEngine, statement, columnLabelAndIndexMap, addCallback, replayCallback);
}

/**
* Execute update.
*
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
* @param updateCallback statement execute update callback
* @param replayCallback statement replay callback
* @param addCallback statement add callback
* @return updated row count
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public int executeUpdate(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteUpdateCallback updateCallback, final StatementAddCallback addCallback, final StatementReplayCallback replayCallback) throws SQLException {
return updateExecutor.executeUpdate(database, queryContext, prepareEngine, updateCallback, addCallback, replayCallback);
}

/**
* Execute.
*
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
* @param executeCallback statement execute callback
* @param addCallback statement add callback
* @param replayCallback statement replay callback
* @return execute result
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public boolean execute(final ShardingSphereDatabase database, final QueryContext queryContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementExecuteCallback executeCallback, final StatementAddCallback addCallback, final StatementReplayCallback replayCallback) throws SQLException {
return executeExecutor.execute(database, queryContext, prepareEngine, executeCallback, addCallback, replayCallback);
}

/**
* Get result set.
*
* @return result set
*/
public Optional<ResultSet> getResultSet() {
return executeExecutor.getResultSet();
}

/**
* Add batch.
*
* @param queryContext query context
* @param database database
*/
public void addBatch(final QueryContext queryContext, final ShardingSphereDatabase database) {
executeBatchExecutor.addBatch(queryContext, database);
}

/**
* Execute batch.
*
* @param database database
* @param sqlStatementContext SQL statement context
* @param generatedValues generated values
* @param statementOption statement option
* @param prepareEngine prepare engine
* @param addCallback statement add callback
* @param replayCallback prepared statement parameters replay callback
* @param generatedKeyCallback generated key callback
* @return generated keys
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public int[] executeBatch(final ShardingSphereDatabase database, final SQLStatementContext sqlStatementContext, final Collection<Comparable<?>> generatedValues,
final StatementOption statementOption, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementAddCallback addCallback, final PreparedStatementParametersReplayCallback replayCallback,
final GeneratedKeyCallback generatedKeyCallback) throws SQLException {
return executeBatchExecutor.executeBatch(database, sqlStatementContext, generatedValues, statementOption, prepareEngine, addCallback, replayCallback, generatedKeyCallback);
}

/**
* Clear.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public ResultSet executeQuery() throws SQLException {
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
currentResultSet = driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
currentResultSet = driverExecutorFacade.executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, columnLabelAndIndexMap,
(StatementAddCallback<PreparedStatement>) this::addStatements, this::replay);
if (currentResultSet instanceof ShardingSphereResultSet) {
columnLabelAndIndexMap = ((ShardingSphereResultSet) currentResultSet).getColumnLabelAndIndexMap();
Expand Down Expand Up @@ -231,7 +231,7 @@ public int executeUpdate() throws SQLException {
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
int result = driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database),
int result = driverExecutorFacade.executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database),
(sql, statement) -> ((PreparedStatement) statement).executeUpdate(), (StatementAddCallback<PreparedStatement>) this::addStatements, this::replay);
findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
return result;
Expand All @@ -256,8 +256,7 @@ public boolean execute() throws SQLException {
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
boolean result = driverExecutorFacade.getExecuteExecutor().execute(
database, queryContext, createDriverExecutionPrepareEngine(database), (sql, statement) -> ((PreparedStatement) statement).execute(),
boolean result = driverExecutorFacade.execute(database, queryContext, createDriverExecutionPrepareEngine(database), (sql, statement) -> ((PreparedStatement) statement).execute(),
(StatementAddCallback<PreparedStatement>) this::addStatements, this::replay);
findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
return result;
Expand All @@ -276,7 +275,7 @@ public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
Optional<ResultSet> resultSet = driverExecutorFacade.getExecuteExecutor().getResultSet();
Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
if (resultSet.isPresent()) {
return resultSet.get();
}
Expand Down Expand Up @@ -380,7 +379,7 @@ private String getGeneratedKeysColumnName(final String columnName) {
public void addBatch() {
currentResultSet = null;
QueryContext queryContext = createQueryContext();
driverExecutorFacade.getExecuteBatchExecutor().addBatch(queryContext, metaData.getDatabase(databaseName));
driverExecutorFacade.addBatch(queryContext, metaData.getDatabase(databaseName));
findGeneratedKey().ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues()));
clearParameters();
}
Expand All @@ -389,7 +388,7 @@ public void addBatch() {
public int[] executeBatch() throws SQLException {
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
try {
return driverExecutorFacade.getExecuteBatchExecutor().executeBatch(database, sqlStatementContext, generatedValues, statementOption,
return driverExecutorFacade.executeBatch(database, sqlStatementContext, generatedValues, statementOption,
createDriverExecutionPrepareEngine(database), (StatementAddCallback<PreparedStatement>) (statements, parameterSets) -> this.statements.addAll(statements),
this::replaySetParameter,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public ResultSet executeQuery(final String sql) throws SQLException {
try {
prepareExecute(queryContext);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
currentResultSet = driverExecutorFacade.getQueryExecutor().executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, null,
currentResultSet = driverExecutorFacade.executeQuery(database, queryContext, createDriverExecutionPrepareEngine(database), this, null,
(StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
return currentResultSet;
// CHECKSTYLE:OFF
Expand Down Expand Up @@ -196,7 +196,7 @@ private int executeUpdate(final String sql, final StatementExecuteUpdateCallback
QueryContext queryContext = createQueryContext(sql);
prepareExecute(queryContext);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
return driverExecutorFacade.getUpdateExecutor().executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database),
return driverExecutorFacade.executeUpdate(database, queryContext, createDriverExecutionPrepareEngine(database),
updateCallback, (StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
}

Expand Down Expand Up @@ -258,7 +258,7 @@ private boolean execute(final String sql, final StatementExecuteCallback stateme
QueryContext queryContext = createQueryContext(sql);
prepareExecute(queryContext);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
return driverExecutorFacade.getExecuteExecutor().execute(database, queryContext, createDriverExecutionPrepareEngine(database), statementExecuteCallback,
return driverExecutorFacade.execute(database, queryContext, createDriverExecutionPrepareEngine(database), statementExecuteCallback,
(StatementAddCallback<Statement>) (statements, parameterSets) -> this.statements.addAll(statements), this::replay);
}

Expand Down Expand Up @@ -325,7 +325,7 @@ public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
Optional<ResultSet> resultSet = driverExecutorFacade.getExecuteExecutor().getResultSet();
Optional<ResultSet> resultSet = driverExecutorFacade.getResultSet();
if (resultSet.isPresent()) {
return resultSet.get();
}
Expand Down

0 comments on commit f276c61

Please sign in to comment.