Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: prepare now prepares on all nodes #266

Merged
merged 3 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scylla
import (
"fmt"
"log"
"sync"

"github.com/mmatczuk/scylla-go-driver/frame"
"github.com/mmatczuk/scylla-go-driver/transport"
Expand Down Expand Up @@ -147,24 +148,39 @@ func (s *Session) Query(content string) Query {
}

func (s *Session) Prepare(content string) (Query, error) {
n := s.policy.Node(s.cluster.NewQueryInfo(), 0)
conn := n.LeastBusyConn()
if conn == nil {
return Query{}, errNoConnection
}

stmt := transport.Statement{Content: content, Consistency: frame.ALL}
res, err := conn.Prepare(stmt)

return Query{session: s,
stmt: res,
exec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
return conn.Execute(stmt, pagingState)
},
asyncExec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
conn.AsyncExecute(stmt, pagingState, handler)
},
}, err
// Prepare on all nodes concurrently.
nodes := s.cluster.Topology().Nodes
resStmt := make([]transport.Statement, len(nodes))
resErr := make([]error, len(nodes))
var wg sync.WaitGroup
for i := range nodes {
wg.Add(1)
go func(idx int) {
defer wg.Done()
resStmt[idx], resErr[idx] = nodes[idx].Prepare(stmt)
}(i)
}
wg.Wait()

// Find first result that succeeded.
for i := range nodes {
if resErr[i] == nil {
return Query{
session: s,
stmt: resStmt[i],
exec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes) (transport.QueryResult, error) {
return conn.Execute(stmt, pagingState)
},
asyncExec: func(conn *transport.Conn, stmt transport.Statement, pagingState frame.Bytes, handler transport.ResponseHandler) {
conn.AsyncExecute(stmt, pagingState, handler)
},
}, nil
}
}

return Query{}, fmt.Errorf("prepare failed on all nodes, details: %v", resErr)
}

func (s *Session) NewTokenAwarePolicy() transport.HostSelectionPolicy {
Expand Down
65 changes: 65 additions & 0 deletions session_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"testing"
"time"

"go.uber.org/goleak"
)
Expand Down Expand Up @@ -264,6 +266,8 @@ func makeCertificatesFromFiles(t *testing.T, certPath, keyPath string) []tls.Cer
}

func TestTLSIntegration(t *testing.T) {
defer goleak.VerifyNone(t)

testCases := []struct {
name string
tlsConfig *tls.Config
Expand Down Expand Up @@ -306,6 +310,7 @@ func TestTLSIntegration(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer session.Close()

stmts := []string{
"CREATE KEYSPACE IF NOT EXISTS mykeyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
Expand Down Expand Up @@ -338,3 +343,63 @@ func TestTLSIntegration(t *testing.T) {
})
}
}

func TestPrepareIntegration(t *testing.T) {
defer goleak.VerifyNone(t)

cfg := DefaultSessionConfig("", TestHost)
session, err := NewSession(cfg)
defer session.Close()

if err != nil {
t.Fatal(err)
}

initStmts := []string{
"DROP KEYSPACE IF EXISTS testks",
"CREATE KEYSPACE IF NOT EXISTS testks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}",
"CREATE TABLE IF NOT EXISTS testks.doubles (pk bigint PRIMARY KEY, v bigint)",
}

for _, stmt := range initStmts {
q := session.Query(stmt)
if _, err := q.Exec(); err != nil {
t.Fatal(err)
}

// Await schema agreement, TODO: implement true schema agreement.
time.Sleep(time.Second)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the sleep is here in order to simulate a "wait for schema agreement" operation? I suggest placing a TODO comment here so that it will be easier to find the workaround-sleeps when somebody gets to implement it.

}

q, err := session.Prepare("INSERT INTO testks.doubles (pk, v) VALUES (?, ?)")
if err != nil {
t.Fatal(err)
}

for i := int64(0); i < 1000; i++ {
_, err := q.BindInt64(0, i).BindInt64(1, 2*i).Exec()
if err != nil {
t.Fatal(err)
}
}

for i := int64(0); i < 1000; i++ {
q, err := session.Prepare("SELECT v FROM testks.doubles WHERE pk = " + fmt.Sprint(i))
if err != nil {
t.Fatal(err)
}

for rep := 0; rep < 3; rep++ {
res, err := q.Exec()
if err != nil {
t.Fatal(err)
}

if v, err := res.Rows[0][0].AsInt64(); err != nil {
t.Fatal(err)
} else if v != 2*i {
t.Fatalf("expected %d, got %d", 2*i, v)
}
}
}
}
10 changes: 7 additions & 3 deletions transport/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type topology struct {
localDC string
peers peerMap
dcRacks dcRacksMap
nodes []*Node
Nodes []*Node
policyInfo policyInfo
keyspaces ksMap
}
Expand Down Expand Up @@ -94,6 +94,10 @@ func (c *Cluster) NewTokenAwareQueryInfo(t Token, ks string) (QueryInfo, error)
top := c.Topology()
// When keyspace is not specified, we take default keyspace from ConnConfig.
if ks == "" {
if c.cfg.Keyspace == "" {
// We don't know anything about the keyspace, fallback to non-token aware query.
return c.NewQueryInfo(), nil
}
Comment on lines +97 to +100
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary for the purpose of this PR (preparing statements on all nodes)? If not, then perhaps it should be sent as a separate PR, or - at least - as a separate commit?

Copy link
Contributor Author

@Kulezi Kulezi Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the test i encountered a missing keyspace error that "" was not found in system_schema.keyspaces, currently the driver always assumes it's querying the default session keyspace which needs to be changed with #257, right now the driver doesn't save info about keyspace from prepared results, hence i needed this workaround. I prefer to do this all with #257.

I moved the workaround to another commit.

ks = c.cfg.Keyspace
}
if stg, ok := top.keyspaces[ks]; ok {
Expand Down Expand Up @@ -219,7 +223,7 @@ func (c *Cluster) refreshTopology() error {
// Every encountered node becomes known host for future use.
c.knownHosts[n.addr] = struct{}{}
t.peers[n.addr] = n
t.nodes = append(t.nodes, n)
t.Nodes = append(t.Nodes, n)
u[uniqueRack{dc: n.datacenter, rack: n.rack}] = struct{}{}
if err := parseTokensFromRow(n, r, &t.policyInfo.ring); err != nil {
return err
Expand Down Expand Up @@ -251,7 +255,7 @@ func newTopology() *topology {
return &topology{
peers: make(peerMap),
dcRacks: make(dcRacksMap),
nodes: make([]*Node, 0),
Nodes: make([]*Node, 0),
policyInfo: policyInfo{
ring: make(Ring, 0),
},
Expand Down
4 changes: 4 additions & 0 deletions transport/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (n *Node) Conn(token Token) *Conn {
return n.pool.Conn(token)
}

func (n *Node) Prepare(s Statement) (Statement, error) {
return n.LeastBusyConn().Prepare(s)
}

type RingEntry struct {
node *Node
token Token
Expand Down
6 changes: 3 additions & 3 deletions transport/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (pi *policyInfo) Preprocess(t *topology, ks keyspace) {
}

func (pi *policyInfo) preprocessSimpleStrategy(t *topology, stg strategy) {
pi.localNodes = t.nodes
pi.localNodes = t.Nodes
sort.Sort(pi.ring)
trie := trieRoot()
for i := range pi.ring {
Expand Down Expand Up @@ -122,14 +122,14 @@ func (pi *policyInfo) preprocessSimpleStrategy(t *topology, stg strategy) {
}

func (pi *policyInfo) preprocessRoundRobinStrategy(t *topology) {
pi.localNodes = t.nodes
pi.localNodes = t.Nodes
pi.remoteNodes = nil
}

func (pi *policyInfo) preprocessDCAwareRoundRobinStrategy(t *topology) {
pi.localNodes = make([]*Node, 0)
pi.remoteNodes = make([]*Node, 0)
for _, v := range t.nodes {
for _, v := range t.Nodes {
if v.datacenter == t.localDC {
pi.localNodes = append(pi.localNodes, v)
} else {
Expand Down
6 changes: 3 additions & 3 deletions transport/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func mockTopologyRoundRobin() *topology {
}

return &topology{
nodes: dummyNodes,
Nodes: dummyNodes,
}
}

Expand Down Expand Up @@ -173,7 +173,7 @@ func mockTopologyTokenAwareSimpleStrategy() *topology {
}

return &topology{
nodes: dummyNodes,
Nodes: dummyNodes,
policyInfo: policyInfo{
ring: ring,
},
Expand Down Expand Up @@ -288,7 +288,7 @@ func mockTopologyTokenAwareDCAwareStrategy() *topology {

return &topology{
dcRacks: dcs,
nodes: dummyNodes,
Nodes: dummyNodes,
policyInfo: policyInfo{ring: ring},
keyspaces: ks,
}
Expand Down