Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecuteFetch: error on multiple result sets #14949

Merged
merged 48 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
222a174
ExecuteFetch: error on multiple results
shlomi-noach Jan 15, 2024
0125804
more suggestive description
shlomi-noach Jan 15, 2024
48bc86d
exhaust further result sets. Prioritize multi-results error over resu…
shlomi-noach Jan 15, 2024
232da20
break down ExecuteFetch multi-statements
shlomi-noach Jan 15, 2024
a6d97f5
ExecuteFetchMultiDrain
shlomi-noach Jan 15, 2024
4e3544f
remove uneccessary (though unharmful) semicolon
shlomi-noach Jan 15, 2024
a4d0271
fix multi statements
shlomi-noach Jan 15, 2024
7b28d8f
fix multi statements
shlomi-noach Jan 15, 2024
2cde946
fix multi statements
shlomi-noach Jan 15, 2024
7867fb2
fix multi statements
shlomi-noach Jan 16, 2024
0c88aa8
fix multi statements
shlomi-noach Jan 16, 2024
c039998
fix multi statements
shlomi-noach Jan 16, 2024
313487c
fix test rewrite
shlomi-noach Jan 16, 2024
9adac78
collect errors rather than number of errors
shlomi-noach Jan 16, 2024
1690cc0
fix multi statements
shlomi-noach Jan 16, 2024
1003574
useDB rather than 'USE ...' statement
shlomi-noach Jan 16, 2024
7be46ad
Fix failover test for reparenting
dbussink Jan 16, 2024
3f8a010
use QueryTabletMultiple
shlomi-noach Jan 16, 2024
c26b25a
Merge branch 'main' into execute-fetch-error-more
shlomi-noach Jan 18, 2024
245b9bc
fix multi statements
shlomi-noach Jan 22, 2024
8e30a12
log query/queries in error message
shlomi-noach Jan 22, 2024
2ef3fa5
RunSQLs runs queries in single connection
shlomi-noach Jan 22, 2024
ed7e27a
fix multi statements
shlomi-noach Jan 22, 2024
02a39e8
fix multi statements
shlomi-noach Jan 22, 2024
b15e38d
clearer error message
shlomi-noach Jan 22, 2024
953330f
QueryTablet loop -> QueryTabletMultiple with no loop
shlomi-noach Jan 22, 2024
2147a8b
fix multi statements
shlomi-noach Jan 22, 2024
28b42a2
fix multi statements
shlomi-noach Jan 22, 2024
3d05639
fix multi statements
shlomi-noach Jan 22, 2024
b87350e
rewording
shlomi-noach Jan 22, 2024
760c64d
fix multi statements
shlomi-noach Jan 22, 2024
dab41b1
generic fix for multi-statements in NewMySQL()
shlomi-noach Jan 22, 2024
8f07a3b
fix multi statements
shlomi-noach Jan 22, 2024
6e1ec2f
simplify test error check
shlomi-noach Jan 22, 2024
5b1d205
Ignore sqlerror.ERNonExistingGrant
shlomi-noach Jan 22, 2024
262a8da
UnwrappedIs
shlomi-noach Jan 22, 2024
ca09be0
turn ErrExecuteFetchMultipleResults to existing/expected muti-result …
shlomi-noach Jan 22, 2024
fb6f8d4
feat: update vtorc tests to function properly
GuptaManan100 Jan 23, 2024
dcffe94
test both before and after multi-result procs
shlomi-noach Jan 24, 2024
404ddb6
do not limit rows, so that we can consume them all
shlomi-noach Jan 24, 2024
fbb23a6
remove grant relaxation patch
shlomi-noach Jan 24, 2024
4351f12
resolved conflict
shlomi-noach Jan 24, 2024
8e1eb7f
refactor: move drain to it's own little method
harshit-gangal Jan 24, 2024
50c2243
fix test: drop table (previously there was a hidden 'Table 'test_idx'…
shlomi-noach Jan 24, 2024
a30c804
FETCH_NO_ROWS
shlomi-noach Jan 30, 2024
39ab378
multi-drain still fully reads packet rows, just not into memory
shlomi-noach Jan 31, 2024
db5de4f
code comments
shlomi-noach Jan 31, 2024
9620d3c
errors: do not re-implement errors.Is
vmg Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package errors

import "errors"

// Wrapped is used to unwrap an error created by errors.Join() in Go 1.20
type Wrapped interface {
Unwrap() []error
Expand Down Expand Up @@ -54,3 +56,13 @@ func UnwrapFirst(err error) error {
}
return UnwrapAll(err)[0]
}

// UnwrappedIs returns 'true' if any of the unwrapped error complies with golang's errors.Is(target)
func UnwrappedIs(err, target error) bool {
for _, serr := range UnwrapAll(err) {
if errors.Is(serr, target) {
return true
}
}
return false
}
60 changes: 60 additions & 0 deletions go/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,63 @@ func TestUnwrap(t *testing.T) {
})
}
}

func TestUnwrappedIs(t *testing.T) {
var err1 = errors.New("err1")

tcases := []struct {
err error
target error
expect bool
}{
{
err: nil,
expect: false,
},
{
err: nil,
target: errors.New("err1"),
expect: false,
},
{
err: errors.New("err1"),
target: errors.New("err1"),
expect: false,
},
{
err: errors.New("err1"),
target: err1,
expect: false,
},
{
err: err1,
target: err1,
expect: true,
},
{
err: errors.Join(err1, errors.New("err2")),
target: err1,
expect: true,
},
{
err: errors.Join(errors.New("err2"), err1),
target: err1,
expect: true,
},
{
err: errors.Join(errors.New("err2"), errors.Join(errors.New("err3"), err1)),
target: err1,
expect: true,
},
}
for _, tcase := range tcases {
name := "nil"
if tcase.err != nil {
name = tcase.err.Error()
}
t.Run(name, func(t *testing.T) {
is := UnwrappedIs(tcase.err, tcase.target)
assert.Equal(t, tcase.expect, is)
})
}
}
13 changes: 12 additions & 1 deletion go/mysql/endtoend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,11 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
assert.EqualValues(t, 1, result.RowsAffected, "insert into returned RowsAffected")
}

qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ' updated'); select * from a; select count(*) from a", 300, true)
// Verify that a ExecuteFetchMultiDrain leaves the connection/packet in valid state.
err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 1'); select * from a; select count(*) from a")
expectNoError(t, err)
// If the previous command leaves packet in invalid state, this will fail.
qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ', fetch multi'); select * from a; select count(*) from a", 300, true)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(multi result)", more, true)
assert.EqualValues(t, 255, qr.RowsAffected)
Expand All @@ -225,6 +229,13 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
expectFlag(t, "ReadQueryResult(2)", more, false)
assert.EqualValues(t, 1, len(qr.Rows), "ReadQueryResult(1)")

// Verify that a ExecuteFetchMultiDrain is happy to operate again after all the above.
err = conn.ExecuteFetchMultiDrain("update a set name = concat(name, ', multi drain 2'); select * from a; select count(*) from a")
expectNoError(t, err)

err = conn.ExecuteFetchMultiDrain("update b set name = concat(name, ' nonexistent table'); select * from a; select count(*) from a")
require.Error(t, err)

_, err = conn.ExecuteFetch("drop table a", 10, true)
require.NoError(t, err)
}
Expand Down
44 changes: 43 additions & 1 deletion go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package mysql

import (
"errors"
"fmt"
"math"
"strconv"
Expand All @@ -34,6 +35,17 @@

// This file contains the methods related to queries.

var (
ErrExecuteFetchMultipleResults = vterrors.Errorf(vtrpc.Code_INTERNAL, "unexpected multiple results. Use ExecuteFetchMulti instead.")
)

const (
// Use as `maxrows` in `ExecuteFetch` and related functions, to indicate no rows should be fetched.
// This is different than specifying `0`, because `0` means "expect zero results", while this means
// "do not attempt to read any results into memory".
FETCH_NO_ROWS = math.MinInt
)

//
// Client side methods.
//
Expand Down Expand Up @@ -303,10 +315,35 @@
// 2. if the server closes the connection when a command is in flight,
// readComQueryResponse will fail, and we'll return CRServerLost(2013).
func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) {
result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields)
result, more, err := c.ExecuteFetchMulti(query, maxrows, wantfields)
if more {
// Multiple results are unexpected. Prioritize this "unexpected" error over whatever error we got from the first result.
err = errors.Join(ErrExecuteFetchMultipleResults, err)
}

Check warning on line 322 in go/mysql/query.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/query.go#L320-L322

Added lines #L320 - L322 were not covered by tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: should we iterate and consume all results?

Copy link
Contributor

@dbussink dbussink Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we must here, otherwise we leave the connection in an invalid state. And a subsequent query on the same connection would see the previous result here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-noach are there other places in the code base where we pass in 0 incorrectly and do want to consume all the results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. ExecuteFetchMulti potentially? But then, this bugs me, because we should be able to pass maxrows = 17 in any place, so why would the draining in ExecuteFetch necessarily have to use -1? And yet, it does, as per #14949 (comment). I'm not sure if this is again limited to stored procedure behavior. I don't think it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not other explicit c.ReadQueryResult(0, ...) call in the code, FWIW.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshit-gangal further edited the ExecuteFetch/drain logic to fix potential leaks, and consolidated the draining logic. I think we should be good now.

// draining to make the connection clean.
err = c.drainMoreResults(more, err)
return result, err
}

// ExecuteFetchMultiDrain is for executing multiple statements in one call, but without
// caring for any results. The function returns an error if any of the statements fail.
// The function drains the query results of all statements, even if there's an error.
func (c *Conn) ExecuteFetchMultiDrain(query string) (err error) {
_, more, err := c.ExecuteFetchMulti(query, FETCH_NO_ROWS, false)
return c.drainMoreResults(more, err)

Check warning on line 333 in go/mysql/query.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/query.go#L331-L333

Added lines #L331 - L333 were not covered by tests
}

// drainMoreResults ensures to drain all query results, even if there's an error.
// We collect all errors until we consume all results.
func (c *Conn) drainMoreResults(more bool, err error) error {
for more {
var moreErr error
_, more, _, moreErr = c.ReadQueryResult(FETCH_NO_ROWS, false)
err = errors.Join(err, moreErr)
}

Check warning on line 343 in go/mysql/query.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/query.go#L340-L343

Added lines #L340 - L343 were not covered by tests
return err
}

// ExecuteFetchMulti is for fetching multiple results from a multi-statement result.
// It returns an additional 'more' flag. If it is set, you must fetch the additional
// results using ReadQueryResult.
Expand Down Expand Up @@ -459,6 +496,11 @@
return nil, false, 0, ParseErrorPacket(data)
}

if maxrows == FETCH_NO_ROWS {
c.recycleReadPacket()
continue
}

Check warning on line 502 in go/mysql/query.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/query.go#L501-L502

Added lines #L501 - L502 were not covered by tests

// Check we're not over the limit before we add more.
if len(result.Rows) == maxrows {
c.recycleReadPacket()
Expand Down
30 changes: 30 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,16 @@ func (vttablet *VttabletProcess) QueryTabletWithDB(query string, dbname string)
return executeQuery(conn, query)
}

// MultiQueryTabletWithDB lets you execute multiple queries on a specific DB in this tablet.
func (vttablet *VttabletProcess) MultiQueryTabletWithDB(query string, dbname string) error {
conn, err := vttablet.defaultConn(dbname)
if err != nil {
return err
}
defer conn.Close()
return executeMultiQuery(conn, query)
}

// executeQuery will retry the query up to 10 times with a small sleep in between each try.
// This allows the tests to be more robust in the face of transient failures.
func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) {
Expand All @@ -536,6 +546,26 @@ func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) {
return result, err
}

// executeMultiQuery will retry the given multi query up to 10 times with a small sleep in between each try.
// This allows the tests to be more robust in the face of transient failures.
func executeMultiQuery(dbConn *mysql.Conn, query string) (err error) {
retries := 10
retryDelay := 1 * time.Second
for i := 0; i < retries; i++ {
if i > 0 {
// We only audit from 2nd attempt and onwards, otherwise this is just too verbose.
log.Infof("Executing query %s (attempt %d of %d)", query, (i + 1), retries)
}
err = dbConn.ExecuteFetchMultiDrain(query)
if err == nil {
break
}
time.Sleep(retryDelay)
}

return err
}

// GetDBVar returns first matching database variable's value
func (vttablet *VttabletProcess) GetDBVar(varName string, ksName string) (string, error) {
return vttablet.getDBSystemValues("variables", varName, ksName)
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/mysqlserver/mysql_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestTimeout(t *testing.T) {
require.Nilf(t, err, "unable to connect mysql: %v", err)
defer conn.Close()

_, err = conn.ExecuteFetch("SELECT SLEEP(5);", 1, false)
_, err = conn.ExecuteFetch("SELECT SLEEP(5)", 1, false)
require.NotNilf(t, err, "quiry timeout error expected")
mysqlErr, ok := err.(*sqlerror.SQLError)
require.Truef(t, ok, "invalid error type")
Expand All @@ -132,7 +132,7 @@ func TestInvalidField(t *testing.T) {
require.Nilf(t, err, "unable to connect mysql: %v", err)
defer conn.Close()

_, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test;", 1, false)
_, err = conn.ExecuteFetch("SELECT invalid_field from vt_insert_test", 1, false)
require.NotNil(t, err, "invalid field error expected")
mysqlErr, ok := err.(*sqlerror.SQLError)
require.Truef(t, ok, "invalid error type")
Expand All @@ -153,7 +153,7 @@ func TestWarnings(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, qr.Rows, "number of rows")

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err, "SHOW WARNINGS")
assert.EqualValues(t, 1, len(qr.Rows), "number of rows")
assert.Contains(t, qr.Rows[0][0].String(), "VARCHAR(\"Warning\")", qr.Rows)
Expand All @@ -164,7 +164,7 @@ func TestWarnings(t *testing.T) {
_, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false)
require.NoError(t, err)

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err)
assert.Empty(t, qr.Rows)

Expand All @@ -175,7 +175,7 @@ func TestWarnings(t *testing.T) {
_, err = conn.ExecuteFetch("SELECT 1 from vt_insert_test limit 1", 1, false)
require.NoError(t, err)

qr, err = conn.ExecuteFetch("SHOW WARNINGS;", 1, false)
qr, err = conn.ExecuteFetch("SHOW WARNINGS", 1, false)
require.NoError(t, err)
assert.Empty(t, qr.Rows)
}
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet)
}

utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave;", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master;", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet)
}

utils.ValidateTopology(t, clusterInstance, true)
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})

// assert that there is indeed only 1 row in tablets[1
res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test;`, tablets[1])
res := utils.RunSQL(context.Background(), t, `select msg from vt_insert_test`, tablets[1])
assert.Equal(t, 1, len(res.Rows))

// Perform a graceful reparent operation
Expand Down Expand Up @@ -217,8 +217,8 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus

if !downPrimary {
// commands to stop the current primary
demoteCommands := "SET GLOBAL read_only = ON; FLUSH TABLES WITH READ LOCK; UNLOCK TABLES"
utils.RunSQL(ctx, t, demoteCommands, tablets[0])
demoteCommands := []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"}
utils.RunSQLs(ctx, t, demoteCommands, tablets[0])

//Get the position of the old primary and wait for the new one to catch up.
err := utils.WaitForReplicationPosition(t, tablets[0], tablets[1])
Expand Down
12 changes: 9 additions & 3 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,16 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet. All commands are
// run in a single connection.
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
tabletParams := getMysqlConnParam(tablet)
conn, err := mysql.Connect(ctx, &tabletParams)
require.Nil(t, err)
defer conn.Close()

for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
result := execute(t, conn, sql)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in this function is that we ensure all queries run in the same conection. This is required for sequences like []string{"SET GLOBAL read_only = ON", "FLUSH TABLES WITH READ LOCK", "UNLOCK TABLES"} above.

results = append(results, result)
}
return results
Expand Down Expand Up @@ -704,7 +710,7 @@ func SetReplicationSourceFailed(tablet *cluster.Vttablet, prsOut string) bool {

// CheckReplicationStatus checks that the replication for sql and io threads is setup as expected
func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, sqlThreadRunning bool, ioThreadRunning bool) {
res := RunSQL(ctx, t, "show slave status;", tablet)
res := RunSQL(ctx, t, "show slave status", tablet)
if ioThreadRunning {
require.Equal(t, "Yes", res.Rows[0][10].ToString())
} else {
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/tabletgateway/buffer/buffer_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
type threadParams struct {
quit bool
rpcs int // Number of queries successfully executed.
errors int // Number of failed queries.
errors []error // Errors returned by the queries.
waitForNotification chan bool // Channel used to notify the main thread that this thread executed
notifyLock sync.Mutex // notifyLock guards the two fields notifyAfterNSuccessfulRpcs/rpcsSoFar.
notifyAfterNSuccessfulRpcs int // If 0, notifications are disabled
Expand All @@ -96,14 +96,14 @@ func (c *threadParams) threadRun(wg *sync.WaitGroup, vtParams *mysql.ConnParams)
if c.reservedConn {
_, err = conn.ExecuteFetch("set default_week_format = 1", 1000, true)
if err != nil {
c.errors++
c.errors = append(c.errors, err)
log.Errorf("error setting default_week_format: %v", err)
}
}
for !c.quit {
err = c.executeFunction(c, conn)
if err != nil {
c.errors++
c.errors = append(c.errors, err)
log.Errorf("error executing function %s: %v", c.typ, err)
}
c.rpcs++
Expand Down Expand Up @@ -342,8 +342,8 @@ func (bt *BufferingTest) Test(t *testing.T) {
updateThreadInstance.stop()

// Both threads must not see any error
assert.Zero(t, readThreadInstance.errors, "found errors in read queries")
assert.Zero(t, updateThreadInstance.errors, "found errors in tx queries")
assert.Empty(t, readThreadInstance.errors, "found errors in read queries")
assert.Empty(t, updateThreadInstance.errors, "found errors in tx queries")

//At least one thread should have been buffered.
//This may fail if a failover is too fast. Add retries then.
Expand Down
Loading
Loading