Skip to content

Commit

Permalink
consolidate logic for ExecuteFetchAsDba and ExecuteMultiFetchAsDba
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Mar 18, 2024
1 parent 2cd06ea commit c2dd2f2
Showing 1 changed file with 68 additions and 111 deletions.
179 changes: 68 additions & 111 deletions go/vt/vttablet/tabletmanager/rpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,17 @@ func analyzeExecuteFetchAsDbaMultiQuery(sql string, parser *sqlparser.Parser) (q
return queries, parseable, countCreate, allowZeroInDate, nil
}

// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema.
func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) {
// ExecuteMultiFetchAsDba will execute the given queries, possibly disabling binlogs and reload schema.
func (tm *TabletManager) executeMultiFetchAsDba(
ctx context.Context,
dbName string,
sql string,
maxRows int,
reloadSchema bool,
disableBinlogs bool,
disableForeignKeyChecks bool,
validateQueries func(queries []string, countCreate int) error,
) ([]*querypb.QueryResult, error) {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
Expand All @@ -81,65 +90,63 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
defer conn.Close()

// Disable binlogs if necessary.
if req.DisableBinlogs {
if disableBinlogs {
_, err := conn.ExecuteFetch("SET sql_log_bin = OFF", 0, false)
if err != nil {
return nil, err
}
}

// Disable FK checks if requested.
if req.DisableForeignKeyChecks {
if disableForeignKeyChecks {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = OFF", 0, false)
if err != nil {
return nil, err
}
}

if req.DbName != "" {
if dbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
_, _ = conn.ExecuteFetch("USE "+sqlescape.EscapeID(req.DbName), 1, false)
_, _ = conn.ExecuteFetch("USE "+sqlescape.EscapeID(dbName), 1, false)
}

statements, _, countCreate, allowZeroInDate, err := analyzeExecuteFetchAsDbaMultiQuery(string(req.Query), tm.Env.Parser())
queries, _, countCreate, allowZeroInDate, err := analyzeExecuteFetchAsDbaMultiQuery(sql, tm.Env.Parser())
if err != nil {
return nil, err
}
if len(statements) > 1 {
// Up to v19, we allow multi-statement SQL in ExecuteFetchAsDba, but only for the specific case
// where all statements are CREATE TABLE or CREATE VIEW. This is to support `ApplySchema --batch-size`.
// In v20, we will not support multi statements whatsoever.
// v20 will throw an error by virtua of using ExecuteFetch instead of ExecuteFetchMulti.
if countCreate != len(statements) {
return nil, vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "multi statement queries are not supported in ExecuteFetchAsDba unless all are CREATE TABLE or CREATE VIEW")
if validateQueries != nil {
if err := validateQueries(queries, countCreate); err != nil {
return nil, err
}
}
if allowZeroInDate {
if _, err := conn.ExecuteFetch("set @@session.sql_mode=REPLACE(REPLACE(@@session.sql_mode, 'NO_ZERO_DATE', ''), 'NO_ZERO_IN_DATE', '')", 1, false); err != nil {
return nil, err
}
}
// Replace any provided sidecar database qualifiers with the correct one.
// TODO(shlomi): we use ReplaceTableQualifiersMultiQuery for backwards compatibility. In v20 we will not accept
// multi statement queries in ExecuteFetchAsDBA. This will be rewritten as ReplaceTableQualifiers()
uq, err := tm.Env.Parser().ReplaceTableQualifiersMultiQuery(string(req.Query), sidecar.DefaultName, sidecar.GetName())
uq, err := tm.Env.Parser().ReplaceTableQualifiersMultiQuery(sql, sidecar.DefaultName, sidecar.GetName())
if err != nil {
return nil, err
}
// TODO(shlomi): we use ExecuteFetchMulti for backwards compatibility. In v20 we will not accept
// multi statement queries in ExecuteFetchAsDBA. This will be rewritten as:
// (in v20): result, err := ExecuteFetch(uq, int(req.MaxRows), true /*wantFields*/)
result, more, err := conn.ExecuteFetchMulti(uq, int(req.MaxRows), true /*wantFields*/)
results := make([]*querypb.QueryResult, 0, len(queries))
result, more, err := conn.ExecuteFetchMulti(uq, maxRows, true /*wantFields*/)
if err == nil {
results = append(results, sqltypes.ResultToProto3(result))
}
for more {
_, more, _, err = conn.ReadQueryResult(0, false)
result, more, _, err = conn.ReadQueryResult(maxRows, true /*wantFields*/)
if err != nil {
return nil, err
}
results = append(results, sqltypes.ResultToProto3(result))
}

// Re-enable FK checks if necessary.
if req.DisableForeignKeyChecks && !conn.IsClosed() {
if disableForeignKeyChecks && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = ON", 0, false)
if err != nil {
// If we can't reset the FK checks flag,
Expand All @@ -149,7 +156,7 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
}

// Re-enable binlogs if necessary.
if req.DisableBinlogs && !conn.IsClosed() {
if disableBinlogs && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
// if we can't reset the sql_log_bin flag,
Expand All @@ -158,107 +165,57 @@ func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanag
}
}

if err == nil && req.ReloadSchema {
if err == nil && reloadSchema {
reloadErr := tm.QueryServiceControl.ReloadSchema(ctx)
if reloadErr != nil {
log.Errorf("failed to reload the schema %v", reloadErr)
}
}
return sqltypes.ResultToProto3(result), err
return results, err
}

// ExecuteMultiFetchAsDba will execute the given queries, possibly disabling binlogs and reload schema.
func (tm *TabletManager) ExecuteMultiFetchAsDba(ctx context.Context, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) {
if err := tm.waitForGrantsToHaveApplied(ctx); err != nil {
return nil, err
}
// Get a connection.
conn, err := tm.MysqlDaemon.GetDbaConnection(ctx)
if err != nil {
return nil, err
}
defer conn.Close()

// Disable binlogs if necessary.
if req.DisableBinlogs {
_, err := conn.ExecuteFetch("SET sql_log_bin = OFF", 0, false)
if err != nil {
return nil, err
}
}

// Disable FK checks if requested.
if req.DisableForeignKeyChecks {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = OFF", 0, false)
if err != nil {
return nil, err
}
}

if req.DbName != "" {
// This execute might fail if db does not exist.
// Error is ignored because given query might create this database.
_, _ = conn.ExecuteFetch("USE "+sqlescape.EscapeID(req.DbName), 1, false)
}

queries, _, _, allowZeroInDate, err := analyzeExecuteFetchAsDbaMultiQuery(string(req.Sql), tm.Env.Parser())
if err != nil {
return nil, err
}
if allowZeroInDate {
if _, err := conn.ExecuteFetch("set @@session.sql_mode=REPLACE(REPLACE(@@session.sql_mode, 'NO_ZERO_DATE', ''), 'NO_ZERO_IN_DATE', '')", 1, false); err != nil {
return nil, err
}
}
// Replace any provided sidecar database qualifiers with the correct one.
// TODO(shlomi): we use ReplaceTableQualifiersMultiQuery for backwards compatibility. In v20 we will not accept
// multi statement queries in ExecuteFetchAsDBA. This will be rewritten as ReplaceTableQualifiers()
uq, err := tm.Env.Parser().ReplaceTableQualifiersMultiQuery(string(req.Sql), sidecar.DefaultName, sidecar.GetName())
// ExecuteFetchAsDba will execute the given query, possibly disabling binlogs and reload schema.
func (tm *TabletManager) ExecuteFetchAsDba(ctx context.Context, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) {
results, err := tm.executeMultiFetchAsDba(
ctx,
req.DbName,
string(req.Query),
int(req.MaxRows),
req.ReloadSchema,
req.DisableBinlogs,
req.DisableForeignKeyChecks,
func(queries []string, countCreate int) error {
// Up to v19, we allow multi-statement SQL in ExecuteFetchAsDba, but only for the specific case
// where all statements are CREATE TABLE or CREATE VIEW. This is to support `ApplySchema --batch-size`.
// In v20, we will not support multi statements whatsoever.
// v20 will throw an error by virtua of using ExecuteFetch instead of ExecuteFetchMulti.
if len(queries) > 1 && len(queries) != countCreate {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "multi statement queries are not supported in ExecuteFetchAsDba unless all are CREATE TABLE or CREATE VIEW")
}
return nil
},
)
if err != nil {
return nil, err
}
// TODO(shlomi): we use ExecuteFetchMulti for backwards compatibility. In v20 we will not accept
// multi statement queries in ExecuteFetchAsDBA. This will be rewritten as:
// (in v20): result, err := ExecuteFetch(uq, int(req.MaxRows), true /*wantFields*/)
results := make([]*querypb.QueryResult, 0, len(queries))
result, more, err := conn.ExecuteFetchMulti(uq, int(req.MaxRows), true /*wantFields*/)
if err == nil {
results = append(results, sqltypes.ResultToProto3(result))
}
for more {
result, more, _, err = conn.ReadQueryResult(int(req.MaxRows), true /*wantFields*/)
if err != nil {
return nil, err
}
results = append(results, sqltypes.ResultToProto3(result))
}

// Re-enable FK checks if necessary.
if req.DisableForeignKeyChecks && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET SESSION foreign_key_checks = ON", 0, false)
if err != nil {
// If we can't reset the FK checks flag,
// let's just close the connection.
conn.Close()
}
}

// Re-enable binlogs if necessary.
if req.DisableBinlogs && !conn.IsClosed() {
_, err := conn.ExecuteFetch("SET sql_log_bin = ON", 0, false)
if err != nil {
// if we can't reset the sql_log_bin flag,
// let's just close the connection.
conn.Close()
}
if len(results) == 0 {
return nil, vterrors.Errorf(vtrpc.Code_INTERNAL, "received no query results in ExecuteFetchAsDba. Expcted at least 1")
}
return results[0], nil
}

if err == nil && req.ReloadSchema {
reloadErr := tm.QueryServiceControl.ReloadSchema(ctx)
if reloadErr != nil {
log.Errorf("failed to reload the schema %v", reloadErr)
}
}
// ExecuteMultiFetchAsDba will execute the given queries, possibly disabling binlogs and reload schema.
func (tm *TabletManager) ExecuteMultiFetchAsDba(ctx context.Context, req *tabletmanagerdatapb.ExecuteMultiFetchAsDbaRequest) ([]*querypb.QueryResult, error) {
results, err := tm.executeMultiFetchAsDba(
ctx,
req.DbName,
string(req.Sql),
int(req.MaxRows),
req.ReloadSchema,
req.DisableBinlogs,
req.DisableForeignKeyChecks,
nil, // Validation query is not needed for ExecuteMultiFetchAsDba
)
return results, err
}

Expand Down

0 comments on commit c2dd2f2

Please sign in to comment.