Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Scale mutation queue with multiple shards #1048

Merged
merged 11 commits into from
Oct 11, 2018
5 changes: 3 additions & 2 deletions core/adminserver/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
return nil, fmt.Errorf("adminserver: domains.Write(): %v", err)
}

// Create shards for queue.
shardIDs := []int64{1}
// Create initial shards for queue.
// TODO(#1063): Additional shards can be added at a later point to support increased server load.
shardIDs := []int64{1, 2}
if err := s.queueAdmin.AddShards(ctx, in.GetDomainId(), shardIDs...); err != nil {
return nil, fmt.Errorf("adminserver: AddShards(%v): %v", shardIDs, err)
}
Expand Down
8 changes: 4 additions & 4 deletions core/adminserver/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *miniEnv) Close() {

type fakeQueueAdmin struct{}

func (*fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error {
func (fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error {
return nil
}

Expand Down Expand Up @@ -177,7 +177,7 @@ func TestCreateRead(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestDelete(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestListDomains(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainIDs []string
Expand Down
9 changes: 5 additions & 4 deletions core/sequencer/sequencer_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ import "google/protobuf/empty.proto";
message MapMetadata {
// SourceSlice is the range of inputs that have been included in a map revision.
message SourceSlice {
// lowest_watermark is the lowest primary key (exclusive) the source log
// that has been incorporated into this map revision. The primary keys of
// logged items MUST be monotonically increasing.
// lowest_watermark is the lowest primary key (exclusive) of the source
// log that has been incorporated into this map revision. The primary
// keys of logged items MUST be monotonically increasing.
int64 lowest_watermark = 1;
// highest_watermark is the highest primary key (inclusive) of the source
// log that has been incorporated into this map revision. The primary keys
// of logged items MUST be monotonically increasing.
int64 highest_watermark = 2;
}
reserved 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you at a point where you can't afford breaking compatibility by simply removing this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately we're early enough that we're happy to break compatibility.
This PR is also introducing backwards-incompatible database schema changes as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was my point. Why using reserved 1 instead of just dropping / reassigning the tag to the new field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good habits I suppose :-)

// sources defines the ranges of inputs used for this map revision for each slice.
// sources is a map from log source IDs to the (low, high] range of primary keys
// in each slice that were used to construct this map revision.
map<int64, SourceSlice> sources = 2;
}

Expand Down
7 changes: 4 additions & 3 deletions core/sequencer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func createMetrics(mf monitoring.MetricFactory) {

// Queue reads messages that haven't been deleted off the queue.
type Queue interface {
// HighWatermark returns the highest timestamp in the mutations table.
// HighWatermarks returns the highest primary key for each shard in the mutations table.
HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error)
// Read returns up to batchSize messages for domainID.
ReadQueue(ctx context.Context, domainID string, shard, low, high int64) ([]*mutator.QueueMessage, error)
// ReadQueue returns the messages under shardID in the (low, high] range.
// ReadQueue does NOT delete messages
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error)
}

// Server implements KeyTransparencySequencerServer.
Expand Down
13 changes: 8 additions & 5 deletions impl/sql/mutationstorage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ import (
"math/rand"
"time"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/google/keytransparency/core/mutator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/golang/glog"
"github.com/golang/protobuf/proto"

pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto"
)

// AddShards creates and adds new shards for queue writing to a domain.
func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error {
glog.Infof("mutationstorage: AddShard(%v, %v)", domainID, shardIDs)
for _, shardID := range shardIDs {
// TODO(gdbelvin): Use INSERT IGNORE to allow this function to be retried.
// TODO(gdbelvin): Migrate to a MySQL Docker image for unit tests.
// MySQL and SQLite do not have the same syntax for INSERT IGNORE.
if _, err := m.db.ExecContext(ctx,
`INSERT INTO Shards (DomainID, ShardID, Enabled) Values(?, ?, ?);`,
domainID, shardID, true); err != nil {
Expand Down Expand Up @@ -69,6 +71,7 @@ func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, erro
if err != nil {
return 0, err
}
defer rows.Close()
for rows.Next() {
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
var shardID int64
if err := rows.Scan(&shardID); err != nil {
Expand Down Expand Up @@ -126,7 +129,7 @@ func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mD
return tx.Commit()
}

// HighWatermarks returns the highest timestamp in the mutations table.
// HighWatermarks returns the highest timestamp for each shard in the mutations table.
func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) {
watermarks := make(map[int64]int64)
rows, err := m.db.QueryContext(ctx,
Expand All @@ -149,7 +152,7 @@ func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[in
return watermarks, nil
}

// ReadQueue reads all mutations that are still in the queue.
// ReadQueue reads all mutations in shardID between (low, high].
func (m *Mutations) ReadQueue(ctx context.Context,
domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) {
rows, err := m.db.QueryContext(ctx,
Expand Down
14 changes: 6 additions & 8 deletions impl/sql/mutationstorage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,22 @@ func TestRandShard(t *testing.T) {

for _, tc := range []struct {
desc string
send bool
send []int64
wantCode codes.Code
wantShards map[int64]bool
}{
{desc: "no rows", wantCode: codes.NotFound, wantShards: map[int64]bool{}},
{desc: "second", send: true, wantShards: map[int64]bool{
{desc: "second", send: []int64{1, 2, 3}, wantShards: map[int64]bool{
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
1: true,
2: true,
3: true,
}},
} {
if tc.send {
if err := m.AddShards(ctx, domainID, 1, 2, 3); err != nil {
t.Fatalf("AddShards(): %v", err)
}
if err := m.AddShards(ctx, domainID, tc.send...); err != nil {
t.Fatalf("AddShards(): %v", err)
}
shards := make(map[int64]bool)
for i := 0; i < 20; i++ {
for i := 0; i < 10*len(tc.wantShards); i++ {
shard, err := m.randShard(ctx, domainID)
if got, want := status.Code(err), tc.wantCode; got != want {
t.Errorf("randShard(): %v, want %v", got, want)
Expand Down Expand Up @@ -106,7 +104,7 @@ func TestSend(t *testing.T) {
}
}

func TestWatermark(t *testing.T) {
func TestWatermarks(t *testing.T) {
ctx := context.Background()
db := newDB(t)
m, err := New(db)
Expand Down