Skip to content

Commit

Permalink
Merge pull request #731 from dolthub/jennifer/regression
Browse files Browse the repository at this point in the history
fix return limited row issue
  • Loading branch information
jennifersp authored Sep 21, 2024
2 parents 6da1bf1 + 8ee9d68 commit d2c759c
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 33 deletions.
66 changes: 35 additions & 31 deletions server/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
"os"
"strings"
Expand Down Expand Up @@ -176,7 +177,9 @@ func (h *ConnectionHandler) setConn(conn net.Conn) {

func (h *ConnectionHandler) handleStartup() error {
startupMessage, err := h.backend.ReceiveStartupMessage()
if err != nil {
if err == io.EOF {
startupMessage = &pgproto3.StartupMessage{}
} else if err != nil {
return fmt.Errorf("error receiving startup message: %w", err)
}

Expand All @@ -199,7 +202,7 @@ func (h *ConnectionHandler) handleStartup() error {
}
_, err = h.Conn().Write(performSSL)
if err != nil {
return fmt.Errorf("error sending deny SSL request: %w", err)
return fmt.Errorf("error sending SSL request: %w", err)
}
// If we have a certificate and the client has asked for SSL support, then we switch here.
// This involves swapping out our underlying net connection for a new one.
Expand Down Expand Up @@ -603,17 +606,30 @@ func (h *ConnectionHandler) handleExecute(message *pgproto3.Execute) error {
return err
}

// we need the CommandComplete message defined here because it's altered by the callback below
commandComplete := &pgproto3.CommandComplete{
CommandTag: []byte(query.StatementTag),
}
callback := h.spoolRowsCallback(query.String, commandComplete, true)
// |rowsAffected| gets altered by the callback below
rowsAffected := int32(0)

callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, true)
err = h.doltgresHandler.ComExecuteBound(context.Background(), h.mysqlConn, query.String, portalData.BoundPlan, callback)
if err != nil {
return err
}

return h.send(commandComplete)
return h.send(makeCommandComplete(query.StatementTag, rowsAffected))
}

func makeCommandComplete(tag string, rows int32) *pgproto3.CommandComplete {
switch tag {
case "INSERT", "DELETE", "UPDATE", "MERGE", "SELECT", "CREATE TABLE AS", "MOVE", "FETCH", "COPY":
if tag == "INSERT" {
tag = "INSERT 0"
}
tag = fmt.Sprintf("%s %d", tag, rows)
}

return &pgproto3.CommandComplete{
CommandTag: []byte(tag),
}
}

// handleCopyData handles the COPY DATA message, by loading the data sent from the client. The |stop| response parameter
Expand Down Expand Up @@ -775,10 +791,10 @@ func (h *ConnectionHandler) convertBindParameters(types []uint32, formatCodes []

// query runs the given query and sends a CommandComplete message to the client
func (h *ConnectionHandler) query(query ConvertedQuery) error {
commandComplete := &pgproto3.CommandComplete{
CommandTag: []byte(query.StatementTag),
}
callback := h.spoolRowsCallback(query.String, commandComplete, false)
// |rowsAffected| gets altered by the callback below
rowsAffected := int32(0)

callback := h.spoolRowsCallback(query.StatementTag, &rowsAffected, false)
err := h.doltgresHandler.ComQuery(context.Background(), h.mysqlConn, query.String, query.AST, callback)
if err != nil {
if strings.HasPrefix(err.Error(), "syntax error at position") {
Expand All @@ -787,17 +803,15 @@ func (h *ConnectionHandler) query(query ConvertedQuery) error {
return err
}

return h.send(commandComplete)
return h.send(makeCommandComplete(query.StatementTag, rowsAffected))
}

// spoolRowsCallback returns a callback function that will send RowDescription message, then a DataRow message for
// each row in the result set.
func (h *ConnectionHandler) spoolRowsCallback(query string, cc *pgproto3.CommandComplete, isExecute bool) func(res *Result) error {
// spoolRowsCallback returns a callback function that will send RowDescription message,
// then a DataRow message for each row in the result set.
func (h *ConnectionHandler) spoolRowsCallback(tag string, rows *int32, isExecute bool) func(res *Result) error {
// IsIUD returns whether the query is either an INSERT, UPDATE, or DELETE query.
q := strings.TrimSpace(strings.ToLower(query))
isIUD := strings.HasPrefix(q, "insert") || strings.HasPrefix(q, "update") || strings.HasPrefix(q, "delete")
isIUD := tag == "INSERT" || tag == "UPDATE" || tag == "DELETE"
return func(res *Result) error {
tag := string(cc.CommandTag)
if returnsRow(tag) {
// EXECUTE does not send RowDescription; instead it should be sent from DESCRIBE prior to it
if !isExecute {
Expand All @@ -817,22 +831,12 @@ func (h *ConnectionHandler) spoolRowsCallback(query string, cc *pgproto3.Command
}
}

var rows int32
if isIUD {
rows = int32(res.RowsAffected)
*rows = int32(res.RowsAffected)
} else {
rows += int32(len(res.Rows))
}

switch tag {
case "INSERT", "DELETE", "UPDATE", "MERGE", "SELECT", "CREATE TABLE AS", "MOVE", "FETCH", "COPY":
if tag == "INSERT" {
tag = "INSERT 0"
}
tag = fmt.Sprintf("%s %d", tag, rows)
*rows += int32(len(res.Rows))
}

cc.CommandTag = []byte(tag)
return nil
}
}
Expand Down
2 changes: 0 additions & 2 deletions server/doltgres_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func (h *DoltgresHandler) doQuery(ctx context.Context, c *mysql.Conn, query stri
schema, rowIter, qFlags, err := queryExec(sqlCtx, query, parsed, analyzedPlan)
if err != nil {
sqlCtx.GetLogger().WithError(err).Warn("error running query")
fmt.Printf("Err: %+v", err)
return err
}

Expand Down Expand Up @@ -518,7 +517,6 @@ func (h *DoltgresHandler) resultForDefaultIter(ctx *sql.Context, schema sql.Sche
err := eg.Wait()
if err != nil {
ctx.GetLogger().WithError(err).Warn("error running query")
fmt.Printf("Err: %+v", err)
returnErr = err
}

Expand Down
54 changes: 54 additions & 0 deletions testing/go/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,5 +757,59 @@ func TestSmokeTests(t *testing.T) {
},
},
},
{
Name: "200 Row Test",
SetUpScript: []string{
"CREATE TABLE test (pk INT8 PRIMARY KEY);",
"INSERT INTO test VALUES " +
"(1), (2), (3), (4), (5), (6), (7), (8), (9), (10)," +
"(11), (12), (13), (14), (15), (16), (17), (18), (19), (20)," +
"(21), (22), (23), (24), (25), (26), (27), (28), (29), (30)," +
"(31), (32), (33), (34), (35), (36), (37), (38), (39), (40)," +
"(41), (42), (43), (44), (45), (46), (47), (48), (49), (50)," +
"(51), (52), (53), (54), (55), (56), (57), (58), (59), (60)," +
"(61), (62), (63), (64), (65), (66), (67), (68), (69), (70)," +
"(71), (72), (73), (74), (75), (76), (77), (78), (79), (80)," +
"(81), (82), (83), (84), (85), (86), (87), (88), (89), (90)," +
"(91), (92), (93), (94), (95), (96), (97), (98), (99), (100)," +
"(101), (102), (103), (104), (105), (106), (107), (108), (109), (110)," +
"(111), (112), (113), (114), (115), (116), (117), (118), (119), (120)," +
"(121), (122), (123), (124), (125), (126), (127), (128), (129), (130)," +
"(131), (132), (133), (134), (135), (136), (137), (138), (139), (140)," +
"(141), (142), (143), (144), (145), (146), (147), (148), (149), (150)," +
"(151), (152), (153), (154), (155), (156), (157), (158), (159), (160)," +
"(161), (162), (163), (164), (165), (166), (167), (168), (169), (170)," +
"(171), (172), (173), (174), (175), (176), (177), (178), (179), (180)," +
"(181), (182), (183), (184), (185), (186), (187), (188), (189), (190)," +
"(191), (192), (193), (194), (195), (196), (197), (198), (199), (200);",
},
Assertions: []ScriptTestAssertion{
{
Query: "SELECT * FROM test ORDER BY pk;",
Expected: []sql.Row{
{1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10},
{11}, {12}, {13}, {14}, {15}, {16}, {17}, {18}, {19}, {20},
{21}, {22}, {23}, {24}, {25}, {26}, {27}, {28}, {29}, {30},
{31}, {32}, {33}, {34}, {35}, {36}, {37}, {38}, {39}, {40},
{41}, {42}, {43}, {44}, {45}, {46}, {47}, {48}, {49}, {50},
{51}, {52}, {53}, {54}, {55}, {56}, {57}, {58}, {59}, {60},
{61}, {62}, {63}, {64}, {65}, {66}, {67}, {68}, {69}, {70},
{71}, {72}, {73}, {74}, {75}, {76}, {77}, {78}, {79}, {80},
{81}, {82}, {83}, {84}, {85}, {86}, {87}, {88}, {89}, {90},
{91}, {92}, {93}, {94}, {95}, {96}, {97}, {98}, {99}, {100},
{101}, {102}, {103}, {104}, {105}, {106}, {107}, {108}, {109}, {110},
{111}, {112}, {113}, {114}, {115}, {116}, {117}, {118}, {119}, {120},
{121}, {122}, {123}, {124}, {125}, {126}, {127}, {128}, {129}, {130},
{131}, {132}, {133}, {134}, {135}, {136}, {137}, {138}, {139}, {140},
{141}, {142}, {143}, {144}, {145}, {146}, {147}, {148}, {149}, {150},
{151}, {152}, {153}, {154}, {155}, {156}, {157}, {158}, {159}, {160},
{161}, {162}, {163}, {164}, {165}, {166}, {167}, {168}, {169}, {170},
{171}, {172}, {173}, {174}, {175}, {176}, {177}, {178}, {179}, {180},
{181}, {182}, {183}, {184}, {185}, {186}, {187}, {188}, {189}, {190},
{191}, {192}, {193}, {194}, {195}, {196}, {197}, {198}, {199}, {200},
},
},
},
},
})
}

0 comments on commit d2c759c

Please sign in to comment.