Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Don't use RAND() / RANDOM()
Browse files Browse the repository at this point in the history
Implement shard selection ourselves.

- RAND and RANDOM are specific to MYSQL and SQLITE respectively.
- Pave the way to caching the list of active shards in memory.
  • Loading branch information
gdbelvin committed Sep 25, 2018
1 parent 9cf8af8 commit 7584613
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 9 deletions.
39 changes: 30 additions & 9 deletions impl/sql/mutationstorage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mutationstorage
import (
"context"
"database/sql"
"math/rand"
"time"

"github.com/google/keytransparency/core/mutator"
Expand All @@ -42,18 +43,38 @@ func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ...
return nil
}

// randShard returns a random, enabled shard for domainID.
func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, error) {
// Read all enabled shards for domainID.
// TODO(gbelvin): Cache these results.
var shardIDs []int64
rows, err := m.db.QueryContext(ctx,
`SELECT ShardID from Shards WHERE DomainID = ? AND Enabled = ?;`,
domainID, true)
if err != nil {
return 0, err
}
for rows.Next() {
var shardID int64
rows.Scan(&shardID)
shardIDs = append(shardIDs, shardID)
}
if err := rows.Err(); err != nil {
return 0, err
}
if len(shardIDs) == 0 {
return 0, status.Errorf(codes.NotFound, "No shard found for domain %v", domainID)
}

// Return a random shard.
return shardIDs[rand.Intn(len(shardIDs))], nil
}

// Send writes mutations to the leading edge (by sequence number) of the mutations table.
func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryUpdate) error {
glog.Infof("mutationstorage: Send(%v, <mutation>)", domainID)
// Select a shard to write to
var shardID int64
err := m.db.QueryRowContext(ctx,
`SELECT ShardID from Shards WHERE DomainID = ? AND Enabled = ? ORDER BY RANDOM() LIMIT 1;`,
domainID, true).Scan(&shardID)
switch {
case err == sql.ErrNoRows:
return status.Errorf(codes.NotFound, "No shard found for domain %v", domainID)
case err != nil:
shardID, err := m.randShard(ctx, domainID)
if err != nil {
return err
}

Expand Down
48 changes: 48 additions & 0 deletions impl/sql/mutationstorage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,59 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

_ "github.com/mattn/go-sqlite3"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
)

func TestRandShard(t *testing.T) {
ctx := context.Background()
db := newDB(t)
m, err := New(db)
if err != nil {
t.Fatalf("Failed to create mutations: %v", err)
}
domainID := "foo"

for _, tc := range []struct {
desc string
send bool
wantCode codes.Code
wantShards map[int64]bool
}{
{desc: "no rows", wantCode: codes.NotFound, wantShards: map[int64]bool{}},
{desc: "second", send: true, wantShards: map[int64]bool{
1: true,
2: true,
3: true,
}},
} {
if tc.send {
if err := m.AddShards(ctx, domainID, 1, 2, 3); err != nil {
t.Fatalf("AddShards(): %v", err)
}
}
shards := make(map[int64]bool)
for i := 0; i < 20; i++ {
shard, err := m.randShard(ctx, domainID)
if got, want := status.Code(err), tc.wantCode; got != want {
t.Errorf("randShard(): %v, want %v", got, want)
}
if err != nil {
break
}
shards[shard] = true
}
if got, want := shards, tc.wantShards; !cmp.Equal(got, want) {
t.Errorf("shards: %v, want %v", got, want)
}
}
}

func TestWatermark(t *testing.T) {
ctx := context.Background()
db := newDB(t)
Expand Down

0 comments on commit 7584613

Please sign in to comment.