Skip to content

Commit

Permalink
Adding support for automatic transaction management with COPY FROM STDIN
Browse files Browse the repository at this point in the history
  • Loading branch information
fulghum committed Sep 24, 2024
1 parent 2841560 commit fc759de
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 2 deletions.
35 changes: 34 additions & 1 deletion server/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"sync/atomic"

"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqlserver"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
Expand Down Expand Up @@ -641,11 +642,15 @@ func (h *ConnectionHandler) handleCopyData(message *pgproto3.CopyData) (stop boo
return false, true, fmt.Errorf("COPY DATA message received without a COPY FROM STDIN operation in progress")
}

// Grab a sql.Context
// Grab a sql.Context and ensure the session has a transaction started, otherwise the copied data
// won't get committed correctly.
sqlCtx, err := h.doltgresHandler.NewContext(context.Background(), h.mysqlConn, "")
if err != nil {
return false, false, err
}
if err = startTransaction(sqlCtx); err != nil {
return false, false, err
}

dataLoader := h.copyFromStdinState.dataLoader
if dataLoader == nil {
Expand Down Expand Up @@ -723,6 +728,17 @@ func (h *ConnectionHandler) handleCopyDone(_ *pgproto3.CopyDone) (stop bool, end
return false, false, err
}

// If we aren't in an explicit/user managed transaction, we need to commit the transaction
if !sqlCtx.GetIgnoreAutoCommit() {
txSession, ok := sqlCtx.Session.(sql.TransactionSession)
if !ok {
return false, false, fmt.Errorf("session does not implement sql.TransactionSession")
}
if err = txSession.CommitTransaction(sqlCtx, txSession.GetTransaction()); err != nil {
return false, false, err
}
}

h.copyFromStdinState = nil
// We send back endOfMessage=true, since the COPY DONE message ends the COPY DATA flow and the server is ready
// to accept the next query now.
Expand Down Expand Up @@ -753,6 +769,23 @@ func (h *ConnectionHandler) handleCopyFail(_ *pgproto3.CopyFail) (stop bool, end
return false, true, nil
}

// startTransaction checks to see if the current session has a transaction started yet or not, and if not,
// creates a read/write transaction for the session to use. This is necessary for handling commands that alter
// data without going through the GMS engine.
func startTransaction(ctx *sql.Context) error {
doltSession, ok := ctx.Session.(*dsess.DoltSession)
if !ok {
return fmt.Errorf("unexpected session type: %T", ctx.Session)
}
if doltSession.GetTransaction() == nil {
if _, err := doltSession.StartTransaction(ctx, sql.ReadWrite); err != nil {
return err
}
}

return nil
}

func (h *ConnectionHandler) deallocatePreparedStatement(name string, preparedStatements map[string]PreparedStatementData, query ConvertedQuery, conn net.Conn) error {
_, ok := preparedStatements[name]
if !ok {
Expand Down
34 changes: 33 additions & 1 deletion testing/bats/dataloading.bats
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ teardown() {
[[ "$output" =~ "3 | 03 | 97302 | Guyane" ]] || false
}

# Tests that we can load tabular data dump files that do not explicitly manage the session's transaction.
@test 'dataloading: tabular import, no explicit tx management' {
# Import the data dump and assert the expected output
run query_server -f $BATS_TEST_DIRNAME/dataloading/tab-load-with-no-tx-control.sql
[ "$status" -eq 0 ]
[[ "$output" =~ "COPY 3" ]] || false
[[ ! "$output" =~ "ERROR" ]] || false

# Check the inserted rows
run query_server -c "SELECT * FROM test_info ORDER BY id;"
[ "$status" -eq 0 ]
[[ "$output" =~ "4 | string for 4 | 1" ]] || false
[[ "$output" =~ "5 | string for 5 | 0" ]] || false
[[ "$output" =~ "6 | string for 6 | 0" ]] || false
}

# Tests loading in data via different CSV data files.
@test 'dataloading: csv import' {
# Import the data dump and assert the expected output
Expand Down Expand Up @@ -157,4 +173,20 @@ teardown() {
run query_server -c "SELECT count(*) from tbl1;"
[ "$status" -eq 0 ]
[[ "$output" =~ "100" ]] || false
}
}

# Tests that we can load CSV data dump files that do not explicitly manage the session's transaction.
@test 'dataloading: csv import, no explicit tx management' {
# Import the data dump and assert the expected output
run query_server -f $BATS_TEST_DIRNAME/dataloading/csv-load-with-no-tx-control.sql
[ "$status" -eq 0 ]
[[ "$output" =~ "COPY 3" ]] || false
[[ ! "$output" =~ "ERROR" ]] || false

# Check the inserted rows
run query_server -c "SELECT * FROM test_info ORDER BY id;"
[ "$status" -eq 0 ]
[[ "$output" =~ "4 | string for 4 | 1" ]] || false
[[ "$output" =~ "5 | string for 5 | 0" ]] || false
[[ "$output" =~ "6 | string for 6 | 0" ]] || false
}
11 changes: 11 additions & 0 deletions testing/bats/dataloading/csv-load-with-no-tx-control.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE test (pk int primary key);
INSERT INTO test VALUES (0), (1);

CREATE TABLE test_info (id int, info varchar(255), test_pk int, primary key(id), foreign key (test_pk) references test(pk));

COPY test_info FROM STDIN (FORMAT CSV, HEADER TRUE);
id,info,test_pk
4,string for 4,1
5,string for 5,0
6,string for 6,0
\.
11 changes: 11 additions & 0 deletions testing/bats/dataloading/tab-load-with-no-tx-control.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE test (pk int primary key);
INSERT INTO test VALUES (0), (1);

CREATE TABLE test_info (id int, info varchar(255), test_pk int, primary key(id), foreign key (test_pk) references test(pk));

COPY test_info FROM STDIN WITH (HEADER);
id info test_pk
4 string for 4 1
5 string for 5 0
6 string for 6 0
\.

0 comments on commit fc759de

Please sign in to comment.