From 7173c8a497da17c111ef807e1c7868fd65f90d78 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Fri, 21 Sep 2018 19:25:19 +0100 Subject: [PATCH 01/10] Add Shards to Queue --- impl/sql/mutationstorage/mutations.go | 7 ++++ impl/sql/mutationstorage/queue.go | 48 +++++++++++++++++++++----- impl/sql/mutationstorage/queue_test.go | 12 +++++-- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/impl/sql/mutationstorage/mutations.go b/impl/sql/mutationstorage/mutations.go index 3e5be0192..588566d58 100644 --- a/impl/sql/mutationstorage/mutations.go +++ b/impl/sql/mutationstorage/mutations.go @@ -50,9 +50,16 @@ var ( );`, `CREATE TABLE IF NOT EXISTS Queue ( DomainID VARCHAR(30) NOT NULL, + ShardID BIGINT NOT NULL, Time BIGINT NOT NULL, Mutation BLOB NOT NULL, PRIMARY KEY(DomainID, Time) + );`, + `CREATE TABLE IF NOT EXISTS Shards ( + DomainID VARCHAR(30) NOT NULL, + ShardID BIGINT NOT NULL, + Write INT NOT NULL, + PRIMARY KEY(DomainID, ShardID) );`, } ) diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 284ba3212..cae0ad536 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -29,20 +29,52 @@ import ( pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto" ) +// AddShards creates and adds new shards for queue writing to a domain. +func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { + glog.Infof("mutationstorage: AddShard(%v, %v)", domainID, shardIDs) + for _, shardID := range shardIDs { + if _, err := m.db.ExecContext(ctx, + `INSERT INTO Shards (DomainID, ShardID, Write) Values(?, ?, ?);`, + domainID, shardID, true); err != nil { + return err + } + } + return nil +} + // Send writes mutations to the leading edge (by sequence number) of the mutations table. func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryUpdate) error { glog.Infof("mutationstorage: Send(%v, )", domainID) + // TODO(gbelvin): Implement retry with backoff for retryable errors if + // we get timestamp contention. + shardID, err := m.randomShard(ctx, domainID) + if err != nil { + return err + } mData, err := proto.Marshal(update) if err != nil { return err } - // TODO(gbelvin): Implement retry with backoff for retryable errors if - // we get timestamp contention. - return m.send(ctx, domainID, mData, time.Now()) + return m.send(ctx, domainID, shardID, mData, time.Now()) +} + +// randomShard returns a random shard from the list of active shards for domainID. +func (m *Mutations) randomShard(ctx context.Context, domainID string) (int64, error) { + var shardID int64 + err := m.db.QueryRowContext(ctx, + `SELECT ShardID from Shards WHERE DomainID = ? ORDER BY RANDOM() LIMIT 1;`, + domainID).Scan(&shardID) + switch { + case err == sql.ErrNoRows: + return 0, status.Errorf(codes.NotFound, "No shard found for domain %v", domainID) + case err != nil: + return 0, err + } + return shardID, nil } // ts must be greater than all other timestamps currently recorded for domainID. -func (m *Mutations) send(ctx context.Context, domainID string, mData []byte, ts time.Time) error { +func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mData []byte, ts time.Time) error { tx, err := m.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if err != nil { return err @@ -50,8 +82,8 @@ func (m *Mutations) send(ctx context.Context, domainID string, mData []byte, ts var maxTime int64 if err := tx.QueryRowContext(ctx, - `SELECT COALESCE(MAX(Time), 0) FROM Queue WHERE DomainID = ?;`, - domainID).Scan(&maxTime); err != nil { + `SELECT COALESCE(MAX(Time), 0) FROM Queue WHERE DomainID = ? AND ShardID = ?;`, + domainID, shardID).Scan(&maxTime); err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { return status.Errorf(codes.Internal, "query err: %v and could not roll back: %v", err, rollbackErr) @@ -69,8 +101,8 @@ func (m *Mutations) send(ctx context.Context, domainID string, mData []byte, ts } if _, err = tx.ExecContext(ctx, - `INSERT INTO Queue (DomainID, Time, Mutation) VALUES (?, ?, ?);`, - domainID, tsTime, mData); err != nil { + `INSERT INTO Queue (DomainID, ShardID, Time, Mutation) VALUES (?, ?, ?, ?);`, + domainID, shardID, tsTime, mData); err != nil { if rollbackErr := tx.Rollback(); rollbackErr != nil { return status.Errorf(codes.Internal, "insert err: %v and could not roll back: %v", err, rollbackErr) diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 5627587af..fd276ccd4 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -38,6 +38,10 @@ func TestSend(t *testing.T) { ts2 := ts1.Add(time.Duration(1)) ts3 := ts2.Add(time.Duration(1)) + if err := m.AddShards(ctx, domainID, 1, 2); err != nil { + t.Fatalf("AddShards(): %v", err) + } + // Test cases are cumulative. Earlier test caes setup later test cases. for _, tc := range []struct { desc string @@ -51,7 +55,7 @@ func TestSend(t *testing.T) { {desc: "Old", ts: ts1, wantCode: codes.Aborted}, {desc: "New", ts: ts3}, } { - err := m.send(ctx, domainID, update, tc.ts) + err := m.send(ctx, domainID, 1, update, tc.ts) if got, want := status.Code(err), tc.wantCode; got != want { t.Errorf("%v: send(): %v, got: %v, want %v", tc.desc, err, got, want) } @@ -69,6 +73,10 @@ func TestWatermark(t *testing.T) { ts1 := time.Now() ts2 := ts1.Add(time.Duration(1)) + if err := m.AddShards(ctx, domainID, 1, 2); err != nil { + t.Fatalf("AddShards(): %v", err) + } + for _, tc := range []struct { desc string send bool @@ -80,7 +88,7 @@ func TestWatermark(t *testing.T) { {desc: "second", send: true, ts: ts2, want: ts2.UnixNano()}, } { if tc.send { - if err := m.send(ctx, domainID, []byte("foo"), tc.ts); err != nil { + if err := m.send(ctx, domainID, 1, []byte("foo"), tc.ts); err != nil { t.Fatalf("send(): %v", err) } } From 5437f9b25db6a9d897db0a1593b54e75b2cb529b Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Fri, 21 Sep 2018 19:41:36 +0100 Subject: [PATCH 02/10] Read Queue by shardID --- impl/sql/mutationstorage/queue.go | 8 ++++---- impl/sql/mutationstorage/queue_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index cae0ad536..42fc595c0 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -126,13 +126,13 @@ func (m *Mutations) HighWatermark(ctx context.Context, domainID string) (int64, } // ReadQueue reads all mutations that are still in the queue up to batchSize. -func (m *Mutations) ReadQueue(ctx context.Context, domainID string, low, high int64) ([]*mutator.QueueMessage, error) { +func (m *Mutations) ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) { rows, err := m.db.QueryContext(ctx, `SELECT Time, Mutation FROM Queue - WHERE DomainID = ? AND - Time > ? AND Time <= ? + WHERE DomainID = ? AND ShardID = ? + AND Time > ? AND Time <= ? ORDER BY Time ASC;`, - domainID, low, high) + domainID, shardID, low, high) if err != nil { return nil, err } diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index fd276ccd4..2dd9d6d06 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -22,6 +22,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto" _ "github.com/mattn/go-sqlite3" ) @@ -101,3 +102,28 @@ func TestWatermark(t *testing.T) { } } } + +func TestReadQueue(t *testing.T) { + ctx := context.Background() + db := newDB(t) + m, err := New(db) + if err != nil { + t.Fatalf("Failed to create mutations: %v", err) + } + domainID := "readqueue" + shardID := int64(5) + if err := m.AddShards(ctx, domainID, shardID); err != nil { + t.Fatalf("AddShards(): %v", err) + } + if err := m.Send(ctx, domainID, &pb.EntryUpdate{}); err != nil { + t.Fatalf("Send(): %v", err) + } + + rows, err := m.ReadQueue(ctx, domainID, shardID, 0, time.Now().UnixNano()) + if err != nil { + t.Fatalf("ReadQueue(): %v", err) + } + if got, want := len(rows), 1; got != want { + t.Fatalf("ReadQueue(): len: %v, want %v", got, want) + } +} From fc8c64fffdb73f75cf8ab4c4f9f4d4fb454a73c1 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Mon, 24 Sep 2018 10:31:45 +0100 Subject: [PATCH 03/10] Write to multiple shards Send - Write to multiple shards HighWatermarks - return multiple high watermarks --- impl/sql/mutationstorage/mutations.go | 2 +- impl/sql/mutationstorage/queue.go | 28 +++++++++++++------- impl/sql/mutationstorage/queue_test.go | 36 ++++++++++++++++---------- 3 files changed, 43 insertions(+), 23 deletions(-) diff --git a/impl/sql/mutationstorage/mutations.go b/impl/sql/mutationstorage/mutations.go index 588566d58..6a9416f74 100644 --- a/impl/sql/mutationstorage/mutations.go +++ b/impl/sql/mutationstorage/mutations.go @@ -53,7 +53,7 @@ var ( ShardID BIGINT NOT NULL, Time BIGINT NOT NULL, Mutation BLOB NOT NULL, - PRIMARY KEY(DomainID, Time) + PRIMARY KEY(DomainID, ShardID, Time) );`, `CREATE TABLE IF NOT EXISTS Shards ( DomainID VARCHAR(30) NOT NULL, diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 42fc595c0..3caed6239 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -113,16 +113,26 @@ func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mD } // HighWatermark returns the highest timestamp in the mutations table. -func (m *Mutations) HighWatermark(ctx context.Context, domainID string) (int64, error) { - var watermark int64 - if err := m.db.QueryRowContext(ctx, - `SELECT Time FROM Queue WHERE DomainID = ? ORDER BY Time DESC LIMIT 1;`, - domainID).Scan(&watermark); err == sql.ErrNoRows { - return 0, nil - } else if err != nil { - return 0, err +func (m *Mutations) HighWatermark(ctx context.Context, domainID string) (map[int64]int64, error) { + watermarks := make(map[int64]int64) + rows, err := m.db.QueryContext(ctx, + `SELECT ShardID, Max(Time) FROM Queue WHERE DomainID = ? GROUP BY ShardID;`, + domainID) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var shardID, watermark int64 + if err := rows.Scan(&shardID, &watermark); err != nil { + return nil, err + } + watermarks[shardID] = watermark + } + if err := rows.Err(); err != nil { + return nil, err } - return watermark, nil + return watermarks, nil } // ReadQueue reads all mutations that are still in the queue up to batchSize. diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 2dd9d6d06..908eecbfd 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -74,31 +75,40 @@ func TestWatermark(t *testing.T) { ts1 := time.Now() ts2 := ts1.Add(time.Duration(1)) - if err := m.AddShards(ctx, domainID, 1, 2); err != nil { + if err := m.AddShards(ctx, domainID, 1, 2, 3); err != nil { t.Fatalf("AddShards(): %v", err) } for _, tc := range []struct { desc string - send bool - ts time.Time - want int64 + send map[int64]time.Time + want map[int64]int64 }{ - {desc: "no rows", want: 0}, - {desc: "first", send: true, ts: ts1, want: ts1.UnixNano()}, - {desc: "second", send: true, ts: ts2, want: ts2.UnixNano()}, + {desc: "no rows", want: map[int64]int64{}}, + { + desc: "first", + send: map[int64]time.Time{1: ts1}, + want: map[int64]int64{1: ts1.UnixNano()}, + }, + { + desc: "second", + // Highwatermarks in each shard proceed independently. + send: map[int64]time.Time{1: ts2, 2: ts1}, + want: map[int64]int64{1: ts2.UnixNano(), 2: ts1.UnixNano()}, + }, } { - if tc.send { - if err := m.send(ctx, domainID, 1, []byte("foo"), tc.ts); err != nil { - t.Fatalf("send(): %v", err) + for shardID, ts := range tc.send { + if err := m.send(ctx, domainID, shardID, []byte("foo"), ts); err != nil { + t.Fatalf("send(%v, %v): %v", shardID, ts, err) } } - high, err := m.HighWatermark(ctx, domainID) + + highs, err := m.HighWatermark(ctx, domainID) if err != nil { t.Fatalf("HighWatermark(): %v", err) } - if high != tc.want { - t.Errorf("HighWatermark(): %v, want > %v", high, tc.want) + if !cmp.Equal(highs, tc.want) { + t.Errorf("HighWatermark(): %v, want %v", highs, tc.want) } } } From 055ac62eab319c8bdb3ba7fe6d1a46ca174040fb Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Mon, 24 Sep 2018 15:54:23 +0100 Subject: [PATCH 04/10] HighWatermarks metadata proto --- core/sequencer/sequencer_api.proto | 7 +- .../sequencer_go_proto/sequencer_api.pb.go | 78 ++++++++++--------- impl/sql/mutationstorage/mutations.go | 2 +- impl/sql/mutationstorage/queue.go | 38 +++++++-- impl/sql/mutationstorage/queue_test.go | 47 ++++++++++- 5 files changed, 124 insertions(+), 48 deletions(-) diff --git a/core/sequencer/sequencer_api.proto b/core/sequencer/sequencer_api.proto index aa6cab908..210d006f5 100644 --- a/core/sequencer/sequencer_api.proto +++ b/core/sequencer/sequencer_api.proto @@ -25,7 +25,7 @@ package google.keytransparency.sequencer; import "google/protobuf/empty.proto"; message MapMetadata { - // MapSourceSlice is the range of inputs that have been included in a map revision. + // SourceSlice is the range of inputs that have been included in a map revision. message SourceSlice { // lowest_watermark is the lowest primary key (exclusive) the source log // that has been incorporated into this map revision. The primary keys of @@ -36,8 +36,9 @@ message MapMetadata { // of logged items MUST be monotonically increasing. int64 highest_watermark = 2; } - // source defines the range of inputs used for this map revision. - SourceSlice source = 1; + reserved 1; + // sources defines the ranges of inputs used for this map revision for each slice. + map sources = 2; } diff --git a/core/sequencer/sequencer_go_proto/sequencer_api.pb.go b/core/sequencer/sequencer_go_proto/sequencer_api.pb.go index 282977a9e..921255b74 100644 --- a/core/sequencer/sequencer_go_proto/sequencer_api.pb.go +++ b/core/sequencer/sequencer_go_proto/sequencer_api.pb.go @@ -32,11 +32,11 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type MapMetadata struct { - // source defines the range of inputs used for this map revision. - Source *MapMetadata_SourceSlice `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // sources defines the ranges of inputs used for this map revision for each slice. + Sources map[int64]*MapMetadata_SourceSlice `protobuf:"bytes,2,rep,name=sources,proto3" json:"sources,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *MapMetadata) Reset() { *m = MapMetadata{} } @@ -64,14 +64,14 @@ func (m *MapMetadata) XXX_DiscardUnknown() { var xxx_messageInfo_MapMetadata proto.InternalMessageInfo -func (m *MapMetadata) GetSource() *MapMetadata_SourceSlice { +func (m *MapMetadata) GetSources() map[int64]*MapMetadata_SourceSlice { if m != nil { - return m.Source + return m.Sources } return nil } -// MapSourceSlice is the range of inputs that have been included in a map revision. +// SourceSlice is the range of inputs that have been included in a map revision. type MapMetadata_SourceSlice struct { // lowest_watermark is the lowest primary key (exclusive) the source log // that has been incorporated into this map revision. The primary keys of @@ -288,6 +288,7 @@ func (m *PublishBatchRequest) GetDomainId() string { func init() { proto.RegisterType((*MapMetadata)(nil), "google.keytransparency.sequencer.MapMetadata") + proto.RegisterMapType((map[int64]*MapMetadata_SourceSlice)(nil), "google.keytransparency.sequencer.MapMetadata.SourcesEntry") proto.RegisterType((*MapMetadata_SourceSlice)(nil), "google.keytransparency.sequencer.MapMetadata.SourceSlice") proto.RegisterType((*CreateEpochRequest)(nil), "google.keytransparency.sequencer.CreateEpochRequest") proto.RegisterType((*RunBatchRequest)(nil), "google.keytransparency.sequencer.RunBatchRequest") @@ -445,33 +446,36 @@ var _KeyTransparencySequencer_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("sequencer_api.proto", fileDescriptor_0a5d61b2e27141ee) } var fileDescriptor_0a5d61b2e27141ee = []byte{ - // 436 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x5d, 0x6f, 0xd3, 0x30, - 0x14, 0xa5, 0xed, 0x98, 0xd2, 0x9b, 0x49, 0x2b, 0x9e, 0x84, 0xaa, 0xf6, 0x65, 0xca, 0x13, 0x08, - 0xe1, 0x88, 0x02, 0x0f, 0xbc, 0x6e, 0xea, 0x03, 0xa0, 0x49, 0x23, 0x45, 0x9a, 0x84, 0x84, 0x2c, - 0x37, 0xb9, 0x24, 0xd6, 0xe2, 0x38, 0x38, 0x0e, 0x5b, 0x7e, 0x10, 0xbf, 0x80, 0x1f, 0xc1, 0xdf, - 0x42, 0x75, 0x3e, 0x16, 0xc6, 0x47, 0xb6, 0xa7, 0xd6, 0xf7, 0x9e, 0x7b, 0xce, 0x3d, 0xc7, 0x0e, - 0x1c, 0x15, 0xf8, 0xb5, 0xc4, 0x2c, 0x44, 0xcd, 0x78, 0x2e, 0x68, 0xae, 0x95, 0x51, 0xe4, 0x38, - 0x56, 0x2a, 0x4e, 0x91, 0x5e, 0x62, 0x65, 0x34, 0xcf, 0x8a, 0x9c, 0x6b, 0xcc, 0xc2, 0x8a, 0x76, - 0xd8, 0xc5, 0xb2, 0x46, 0xf8, 0x16, 0xbf, 0x2d, 0xbf, 0xf8, 0x28, 0x73, 0x53, 0xd5, 0xe3, 0xde, - 0xcf, 0x11, 0xb8, 0x67, 0x3c, 0x3f, 0x43, 0xc3, 0x23, 0x6e, 0x38, 0xf9, 0x00, 0xfb, 0x85, 0x2a, - 0x75, 0x88, 0xf3, 0xd1, 0xf1, 0xe8, 0x89, 0xbb, 0x7a, 0x43, 0x87, 0xf8, 0x69, 0x6f, 0x9c, 0x6e, - 0xec, 0xec, 0x26, 0x15, 0x21, 0x06, 0x0d, 0xd1, 0x02, 0xc1, 0xed, 0x95, 0xc9, 0x53, 0x98, 0xa5, - 0xea, 0x0a, 0x0b, 0xc3, 0xae, 0xb8, 0x41, 0x2d, 0xb9, 0xbe, 0xb4, 0x5a, 0x93, 0xe0, 0xb0, 0xae, - 0x5f, 0xb4, 0x65, 0xf2, 0x0c, 0x1e, 0x25, 0x22, 0x4e, 0x7e, 0xc7, 0x8e, 0x2d, 0x76, 0xd6, 0x34, - 0x3a, 0xb0, 0xf7, 0x7d, 0x04, 0xe4, 0x54, 0x23, 0x37, 0xb8, 0xce, 0x55, 0x98, 0x04, 0xbb, 0xfd, - 0x0a, 0x43, 0x96, 0x30, 0x8d, 0x94, 0xe4, 0x22, 0x63, 0x22, 0xb2, 0x3a, 0xd3, 0xc0, 0xa9, 0x0b, - 0x6f, 0x23, 0xb2, 0x00, 0x47, 0xe3, 0x37, 0x51, 0x08, 0x95, 0xcd, 0x27, 0x96, 0xb7, 0x3b, 0x93, - 0x73, 0x38, 0x90, 0x3c, 0x67, 0xb2, 0xb1, 0x36, 0xdf, 0xb3, 0x79, 0x3c, 0xbf, 0x57, 0x1e, 0x81, - 0x2b, 0x6f, 0x0e, 0xef, 0xf6, 0x9c, 0xf1, 0x6c, 0xe2, 0x25, 0x70, 0x18, 0x94, 0xd9, 0x09, 0x37, - 0x77, 0xdc, 0x71, 0x09, 0x53, 0x29, 0x32, 0xb6, 0xdd, 0x0d, 0x58, 0xf3, 0x0f, 0x03, 0x47, 0x8a, - 0x9a, 0xc0, 0x36, 0xf9, 0x75, 0xd3, 0x9c, 0x34, 0x4d, 0x7e, 0x6d, 0x9b, 0xde, 0x0a, 0x8e, 0xce, - 0xcb, 0x6d, 0x2a, 0x8a, 0xe4, 0xce, 0x6a, 0xab, 0x1f, 0x63, 0x98, 0xbf, 0xc7, 0xea, 0x63, 0xcf, - 0xda, 0xa6, 0x75, 0x46, 0x2e, 0xc0, 0x69, 0x57, 0x27, 0x2f, 0x86, 0x83, 0xb8, 0x65, 0x73, 0xf1, - 0xb8, 0x1d, 0x69, 0x5f, 0x22, 0x5d, 0xef, 0x5e, 0xa2, 0xf7, 0x80, 0x7c, 0x06, 0xb7, 0x77, 0x75, - 0xe4, 0xd5, 0x30, 0xf7, 0x9f, 0x37, 0xfd, 0x1f, 0x7a, 0x06, 0x07, 0xfd, 0x20, 0xc8, 0xeb, 0x61, - 0xfe, 0xbf, 0x04, 0xf7, 0x6f, 0x81, 0x93, 0xf5, 0xa7, 0xd3, 0x58, 0x98, 0xa4, 0xdc, 0xd2, 0x50, - 0x49, 0xbf, 0xf9, 0xde, 0x6e, 0x91, 0xfb, 0xa1, 0xd2, 0xe8, 0x77, 0x0a, 0x37, 0xff, 0x58, 0xac, - 0x58, 0xcd, 0xb8, 0x6f, 0x7f, 0x5e, 0xfe, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x69, 0x63, 0xbc, 0xd3, - 0xe9, 0x03, 0x00, 0x00, + // 484 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x5d, 0x6f, 0x94, 0x40, + 0x14, 0x75, 0x61, 0xab, 0xec, 0x65, 0x93, 0xe2, 0x34, 0x31, 0x84, 0x7d, 0xd9, 0xf0, 0x54, 0x63, + 0x84, 0xb8, 0x6a, 0xa2, 0x7d, 0x6c, 0xb3, 0x0f, 0xd6, 0x34, 0x36, 0x6c, 0x93, 0x26, 0x26, 0x86, + 0xcc, 0xb2, 0x57, 0x98, 0x2c, 0x30, 0x38, 0x0c, 0x6d, 0xf9, 0x41, 0xfe, 0x02, 0x7f, 0x84, 0x7f, + 0xcb, 0xec, 0x00, 0x5b, 0xac, 0x1f, 0x5b, 0x7d, 0x82, 0x99, 0x73, 0xee, 0x39, 0xe7, 0xde, 0xc9, + 0x85, 0x83, 0x12, 0xbf, 0x54, 0x98, 0x47, 0x28, 0x42, 0x5a, 0x30, 0xaf, 0x10, 0x5c, 0x72, 0x32, + 0x8d, 0x39, 0x8f, 0x53, 0xf4, 0xd6, 0x58, 0x4b, 0x41, 0xf3, 0xb2, 0xa0, 0x02, 0xf3, 0xa8, 0xf6, + 0xb6, 0x5c, 0x67, 0xd2, 0x30, 0x7c, 0xc5, 0x5f, 0x56, 0x9f, 0x7d, 0xcc, 0x0a, 0x59, 0x37, 0xe5, + 0xee, 0x77, 0x0d, 0xcc, 0x33, 0x5a, 0x9c, 0xa1, 0xa4, 0x2b, 0x2a, 0x29, 0xb9, 0x80, 0x47, 0x25, + 0xaf, 0x44, 0x84, 0xa5, 0xad, 0x4d, 0xf5, 0x43, 0x73, 0x76, 0xe4, 0xed, 0x32, 0xf0, 0x7a, 0xf5, + 0xde, 0xa2, 0x29, 0x9e, 0xe7, 0x52, 0xd4, 0x41, 0x27, 0xe5, 0x20, 0x98, 0x0d, 0xb0, 0x48, 0x59, + 0x84, 0xe4, 0x29, 0x58, 0x29, 0xbf, 0xc6, 0x52, 0x86, 0xd7, 0x54, 0xa2, 0xc8, 0xa8, 0x58, 0xdb, + 0x83, 0xe9, 0xe0, 0x50, 0x0f, 0xf6, 0x9b, 0xfb, 0xcb, 0xee, 0x9a, 0x3c, 0x83, 0xc7, 0x09, 0x8b, + 0x93, 0x9f, 0xb9, 0x9a, 0xe2, 0x5a, 0x2d, 0xb0, 0x25, 0x3b, 0x15, 0x8c, 0xfb, 0xfe, 0xc4, 0x02, + 0x7d, 0x8d, 0x75, 0x2b, 0xbd, 0xf9, 0x25, 0x1f, 0x60, 0xef, 0x8a, 0xa6, 0x15, 0x2a, 0x09, 0x73, + 0xf6, 0xf6, 0x7f, 0x9a, 0x53, 0x3d, 0x04, 0x8d, 0xce, 0x91, 0xf6, 0x66, 0x70, 0x3a, 0x34, 0x06, + 0x96, 0xe6, 0x7e, 0x1d, 0x00, 0x39, 0x11, 0x48, 0x25, 0xce, 0x0b, 0x1e, 0x25, 0xc1, 0x46, 0xa1, + 0x94, 0x64, 0x02, 0xa3, 0x15, 0xcf, 0x28, 0xcb, 0x43, 0xb6, 0x52, 0x49, 0x46, 0x81, 0xd1, 0x5c, + 0xbc, 0x5b, 0x11, 0x07, 0x0c, 0x81, 0x57, 0xac, 0x64, 0x3c, 0xb7, 0x75, 0x95, 0x72, 0x7b, 0x26, + 0xe7, 0x30, 0xce, 0x68, 0x11, 0x66, 0xad, 0xb9, 0x3d, 0x54, 0x89, 0x9f, 0xff, 0x53, 0xe2, 0xc0, + 0xcc, 0x6e, 0x0f, 0xa7, 0x43, 0x43, 0xb3, 0x74, 0x37, 0x81, 0xfd, 0xa0, 0xca, 0x8f, 0xa9, 0xbc, + 0x67, 0xc6, 0x09, 0x8c, 0x32, 0x96, 0x87, 0xcb, 0x4d, 0x81, 0x1a, 0xdb, 0x5e, 0x60, 0x64, 0xac, + 0x11, 0x50, 0x20, 0xbd, 0x69, 0x41, 0xbd, 0x05, 0xe9, 0x8d, 0x02, 0xdd, 0x19, 0x1c, 0x9c, 0x57, + 0xcb, 0x94, 0x95, 0xc9, 0xbd, 0xdd, 0x66, 0xdf, 0x34, 0xb0, 0xdf, 0x63, 0x7d, 0xd1, 0x6b, 0x6d, + 0xd1, 0x75, 0x46, 0x2e, 0xc1, 0xe8, 0xa2, 0x93, 0x17, 0xbb, 0x07, 0x71, 0xa7, 0x4d, 0xe7, 0x49, + 0x57, 0xd2, 0x6d, 0x82, 0x37, 0xdf, 0x6c, 0x82, 0xfb, 0x80, 0x7c, 0x02, 0xb3, 0xf7, 0x74, 0xe4, + 0xd5, 0x6e, 0xed, 0x5f, 0x5f, 0xfa, 0x2f, 0xf2, 0x21, 0x8c, 0xfb, 0x83, 0x20, 0xaf, 0x77, 0xeb, + 0xff, 0x66, 0x70, 0x7f, 0x36, 0x38, 0x9e, 0x7f, 0x3c, 0x89, 0x99, 0x4c, 0xaa, 0xa5, 0x17, 0xf1, + 0xcc, 0x6f, 0xf7, 0xfd, 0x8e, 0xb8, 0x1f, 0x71, 0x81, 0xfe, 0xd6, 0xe1, 0xf6, 0x2f, 0x8c, 0x79, + 0xd8, 0x28, 0x3e, 0x54, 0x9f, 0x97, 0x3f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x6b, 0xbf, 0xeb, 0xe0, + 0x69, 0x04, 0x00, 0x00, } diff --git a/impl/sql/mutationstorage/mutations.go b/impl/sql/mutationstorage/mutations.go index 6a9416f74..0d300cd00 100644 --- a/impl/sql/mutationstorage/mutations.go +++ b/impl/sql/mutationstorage/mutations.go @@ -58,7 +58,7 @@ var ( `CREATE TABLE IF NOT EXISTS Shards ( DomainID VARCHAR(30) NOT NULL, ShardID BIGINT NOT NULL, - Write INT NOT NULL, + Enabled INTEGER NOT NULL, PRIMARY KEY(DomainID, ShardID) );`, } diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 3caed6239..983a62ef1 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -17,6 +17,7 @@ package mutationstorage import ( "context" "database/sql" + "math/rand" "time" "github.com/google/keytransparency/core/mutator" @@ -34,7 +35,7 @@ func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ... glog.Infof("mutationstorage: AddShard(%v, %v)", domainID, shardIDs) for _, shardID := range shardIDs { if _, err := m.db.ExecContext(ctx, - `INSERT INTO Shards (DomainID, ShardID, Write) Values(?, ?, ?);`, + `INSERT INTO Shards (DomainID, ShardID, Enabled) Values(?, ?, ?);`, domainID, shardID, true); err != nil { return err } @@ -42,11 +43,36 @@ func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ... return nil } +// randShard returns a random, enabled shard for domainID. +func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, error) { + // Read all enabled shards for domainID. + // TODO(gbelvin): Cache these results. + var shardIDs []int64 + rows, err := m.db.QueryContext(ctx, + `SELECT ShardID from Shards WHERE DomainID = ? AND Enabled = ?;`, + domainID, true) + if err != nil { + return 0, err + } + for rows.Next() { + var shardID int64 + rows.Scan(&shardID) + shardIDs = append(shardIDs, shardID) + } + if err := rows.Err(); err != nil { + return 0, err + } + if len(shardIDs) == 0 { + return 0, status.Errorf(codes.NotFound, "No shard found for domain %v", domainID) + } + + // Return a random shard. + return shardIDs[rand.Intn(len(shardIDs))], nil +} + // Send writes mutations to the leading edge (by sequence number) of the mutations table. func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryUpdate) error { glog.Infof("mutationstorage: Send(%v, )", domainID) - // TODO(gbelvin): Implement retry with backoff for retryable errors if - // we get timestamp contention. shardID, err := m.randomShard(ctx, domainID) if err != nil { return err @@ -55,6 +81,8 @@ func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryU if err != nil { return err } + // TODO(gbelvin): Implement retry with backoff for retryable errors if + // we get timestamp contention. return m.send(ctx, domainID, shardID, mData, time.Now()) } @@ -112,8 +140,8 @@ func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mD return tx.Commit() } -// HighWatermark returns the highest timestamp in the mutations table. -func (m *Mutations) HighWatermark(ctx context.Context, domainID string) (map[int64]int64, error) { +// HighWatermarks returns the highest timestamp in the mutations table. +func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) { watermarks := make(map[int64]int64) rows, err := m.db.QueryContext(ctx, `SELECT ShardID, Max(Time) FROM Queue WHERE DomainID = ? GROUP BY ShardID;`, diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 908eecbfd..347b4e86e 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -27,6 +27,50 @@ import ( _ "github.com/mattn/go-sqlite3" ) +func TestRandShard(t *testing.T) { + ctx := context.Background() + db := newDB(t) + m, err := New(db) + if err != nil { + t.Fatalf("Failed to create Mutations: %v", err) + } + domainID := "foo" + + for _, tc := range []struct { + desc string + send bool + wantCode codes.Code + wantShards map[int64]bool + }{ + {desc: "no rows", wantCode: codes.NotFound, wantShards: map[int64]bool{}}, + {desc: "second", send: true, wantShards: map[int64]bool{ + 1: true, + 2: true, + 3: true, + }}, + } { + if tc.send { + if err := m.AddShards(ctx, domainID, 1, 2, 3); err != nil { + t.Fatalf("AddShards(): %v", err) + } + } + shards := make(map[int64]bool) + for i := 0; i < 20; i++ { + shard, err := m.randShard(ctx, domainID) + if got, want := status.Code(err), tc.wantCode; got != want { + t.Errorf("randShard(): %v, want %v", got, want) + } + if err != nil { + break + } + shards[shard] = true + } + if got, want := shards, tc.wantShards; !cmp.Equal(got, want) { + t.Errorf("shards: %v, want %v", got, want) + } + } +} + func TestSend(t *testing.T) { ctx := context.Background() db := newDB(t) @@ -102,8 +146,7 @@ func TestWatermark(t *testing.T) { t.Fatalf("send(%v, %v): %v", shardID, ts, err) } } - - highs, err := m.HighWatermark(ctx, domainID) + highs, err := m.HighWatermarks(ctx, domainID) if err != nil { t.Fatalf("HighWatermark(): %v", err) } From 39122ea8ad91ed0f84ecf956d9b99fab22292015 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Mon, 24 Sep 2018 15:59:41 +0100 Subject: [PATCH 05/10] Read from multiple shards --- core/sequencer/server.go | 57 ++++++++++++++++--------------- impl/sql/mutationstorage/queue.go | 12 ++++--- 2 files changed, 36 insertions(+), 33 deletions(-) diff --git a/core/sequencer/server.go b/core/sequencer/server.go index 96f6416e6..bbcf74a1f 100644 --- a/core/sequencer/server.go +++ b/core/sequencer/server.go @@ -70,11 +70,10 @@ func createMetrics(mf monitoring.MetricFactory) { // Queue reads messages that haven't been deleted off the queue. type Queue interface { - // HighWatermark returns the highest primary key in the mutations table for DomainID. - HighWatermark(ctx context.Context, domainID string) (int64, error) - // ReadQueue returns the messages between (low, high] for domainID. - // TODO(gbelvin): Add paging API back in to support sharded reads. - ReadQueue(ctx context.Context, domainID string, low, high int64) ([]*mutator.QueueMessage, error) + // HighWatermark returns the highest timestamp in the mutations table. + HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) + // Read returns up to batchSize messages for domainID. + ReadQueue(ctx context.Context, domainID string, shard, low, high int64) ([]*mutator.QueueMessage, error) } // Server implements KeyTransparencySequencerServer. @@ -126,7 +125,7 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty. if err := proto.Unmarshal(latestMapRoot.Metadata, &lastMeta); err != nil { return nil, err } - high, err := s.queue.HighWatermark(ctx, in.DomainId) + highs, err := s.queue.HighWatermarks(ctx, in.DomainId) if err != nil { return nil, status.Errorf(codes.Internal, "HighWatermark(): %v", err) } @@ -138,42 +137,44 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty. // Count items to be processed. Unfortunately, this means we will be // reading the items to be processed twice. Once, here in RunBatch and // again in CreateEpoch. - metadata := &spb.MapMetadata{ - Source: &spb.MapMetadata_SourceSlice{ - LowestWatermark: lastMeta.GetSource().GetHighestWatermark(), + sources := make(map[int64]*spb.MapMetadata_SourceSlice) + for sliceID, high := range highs { + sources[sliceID] = &spb.MapMetadata_SourceSlice{ + LowestWatermark: lastMeta.Sources[sliceID].GetHighestWatermark(), HighestWatermark: high, - }, + } } - msgs, err := s.readMessages(ctx, in.DomainId, metadata.GetSource()) + msgs, err := s.readMessages(ctx, in.DomainId, sources) if err != nil { return nil, err } - if int32(len(msgs)) < in.MinBatch { + if len(msgs) < int(in.MinBatch) { return &empty.Empty{}, nil } return s.CreateEpoch(ctx, &spb.CreateEpochRequest{ DomainId: in.DomainId, Revision: int64(latestMapRoot.Revision) + 1, - MapMetadata: metadata, + MapMetadata: &spb.MapMetadata{Sources: sources}, }) } func (s *Server) readMessages(ctx context.Context, domainID string, - source *spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) { - // Read mutations - batch, err := s.queue.ReadQueue(ctx, domainID, - source.GetLowestWatermark(), source.GetHighestWatermark()) - if err != nil { - return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err) - } - msgs := make([]*ktpb.EntryUpdate, 0, len(batch)) - for _, m := range batch { - msgs = append(msgs, &ktpb.EntryUpdate{ - Mutation: m.Mutation, - Committed: m.ExtraData, - }) + sources map[int64]*spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) { + msgs := make([]*ktpb.EntryUpdate, 0) + for shardID, source := range sources { + batch, err := s.queue.ReadQueue(ctx, domainID, shardID, + source.GetLowestWatermark(), source.GetHighestWatermark()) + if err != nil { + return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err) + } + for _, m := range batch { + msgs = append(msgs, &ktpb.EntryUpdate{ + Mutation: m.Mutation, + Committed: m.ExtraData, + }) + } } return msgs, nil } @@ -181,10 +182,10 @@ func (s *Server) readMessages(ctx context.Context, domainID string, // CreateEpoch applies the supplied mutations to the current map revision and creates a new epoch. func (s *Server) CreateEpoch(ctx context.Context, in *spb.CreateEpochRequest) (*empty.Empty, error) { domainID := in.GetDomainId() - if in.MapMetadata.GetSource() == nil { + if in.MapMetadata.GetSources() == nil { return nil, status.Errorf(codes.InvalidArgument, "missing map metadata") } - msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSource()) + msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSources()) if err != nil { return nil, err } diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 983a62ef1..458062028 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -56,7 +56,9 @@ func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, erro } for rows.Next() { var shardID int64 - rows.Scan(&shardID) + if err := rows.Scan(&shardID); err != nil { + return 0, err + } shardIDs = append(shardIDs, shardID) } if err := rows.Err(); err != nil { @@ -163,12 +165,12 @@ func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[in return watermarks, nil } -// ReadQueue reads all mutations that are still in the queue up to batchSize. -func (m *Mutations) ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) { +// ReadQueue reads all mutations that are still in the queue. +func (m *Mutations) ReadQueue(ctx context.Context, + domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) { rows, err := m.db.QueryContext(ctx, `SELECT Time, Mutation FROM Queue - WHERE DomainID = ? AND ShardID = ? - AND Time > ? AND Time <= ? + WHERE DomainID = ? AND ShardID = ? AND Time > ? AND Time <= ? ORDER BY Time ASC;`, domainID, shardID, low, high) if err != nil { From 0748802ff14a30738f264b5beae1b36c39b5b4e1 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Tue, 25 Sep 2018 14:53:28 +0100 Subject: [PATCH 06/10] Create new shards during domain creation --- cmd/keytransparency-sequencer/main.go | 2 +- core/adminserver/admin_server.go | 41 +++++++++++++++++++-------- core/adminserver/admin_server_test.go | 12 ++++++-- impl/integration/env.go | 10 +++---- 4 files changed, 44 insertions(+), 21 deletions(-) diff --git a/cmd/keytransparency-sequencer/main.go b/cmd/keytransparency-sequencer/main.go index 5232794ac..a561fff5d 100644 --- a/cmd/keytransparency-sequencer/main.go +++ b/cmd/keytransparency-sequencer/main.go @@ -113,7 +113,7 @@ func main() { keygen := func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) { return der.NewProtoFromSpec(spec) } - adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, keygen) + adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, mutations, keygen) glog.Infof("Signer starting") // Run servers diff --git a/core/adminserver/admin_server.go b/core/adminserver/admin_server.go index 64d0d392b..0e1aca980 100644 --- a/core/adminserver/admin_server.go +++ b/core/adminserver/admin_server.go @@ -75,14 +75,21 @@ var ( } ) +// QueueAdmin controls the lifecycle and scaling of mutation queues. +type QueueAdmin interface { + // AddShards creates and adds new shards for queue writing to a domain. + AddShards(ctx context.Context, domainID string, shardIDs ...int64) error +} + // Server implements pb.KeyTransparencyAdminServer type Server struct { - tlog tpb.TrillianLogClient - tmap tpb.TrillianMapClient - logAdmin tpb.TrillianAdminClient - mapAdmin tpb.TrillianAdminClient - domains domain.Storage - keygen keys.ProtoGenerator + tlog tpb.TrillianLogClient + tmap tpb.TrillianMapClient + logAdmin tpb.TrillianAdminClient + mapAdmin tpb.TrillianAdminClient + domains domain.Storage + queueAdmin QueueAdmin + keygen keys.ProtoGenerator } // New returns a KeyTransparencyAdmin implementation. @@ -91,15 +98,17 @@ func New( tmap tpb.TrillianMapClient, logAdmin, mapAdmin tpb.TrillianAdminClient, domains domain.Storage, + queueAdmin QueueAdmin, keygen keys.ProtoGenerator, ) *Server { return &Server{ - tlog: tlog, - tmap: tmap, - logAdmin: logAdmin, - mapAdmin: mapAdmin, - domains: domains, - keygen: keygen, + tlog: tlog, + tmap: tmap, + logAdmin: logAdmin, + mapAdmin: mapAdmin, + domains: domains, + queueAdmin: queueAdmin, + keygen: keygen, } } @@ -249,6 +258,7 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) ( err, logTree.TreeId, delLogErr, mapTree.TreeId, delMapErr) } + // Create domain - {log, map} binding. if err := s.domains.Write(ctx, &domain.Domain{ DomainID: in.GetDomainId(), MapID: mapTree.TreeId, @@ -260,6 +270,13 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) ( }); err != nil { return nil, fmt.Errorf("adminserver: domains.Write(): %v", err) } + + // Create shards for queue. + shardIDs := []int64{1} + if err := s.queueAdmin.AddShards(ctx, in.GetDomainId(), shardIDs...); err != nil { + return nil, fmt.Errorf("adminserver: AddShards(%v): %v", shardIDs, err) + } + d := &pb.Domain{ DomainId: in.GetDomainId(), Log: logTree, diff --git a/core/adminserver/admin_server_test.go b/core/adminserver/admin_server_test.go index f63bdf8c4..6e70234bf 100644 --- a/core/adminserver/admin_server_test.go +++ b/core/adminserver/admin_server_test.go @@ -85,6 +85,12 @@ func (e *miniEnv) Close() { e.stopMockServer() } +type fakeQueueAdmin struct{} + +func (*fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { + return nil +} + func TestCreateDomain(t *testing.T) { for _, tc := range []struct { desc string @@ -171,7 +177,7 @@ func TestCreateRead(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainID string @@ -223,7 +229,7 @@ func TestDelete(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainID string @@ -287,7 +293,7 @@ func TestListDomains(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainIDs []string diff --git a/impl/integration/env.go b/impl/integration/env.go index 931d0a04b..3c644c671 100644 --- a/impl/integration/env.go +++ b/impl/integration/env.go @@ -141,7 +141,11 @@ func NewEnv(ctx context.Context) (*Env, error) { if err != nil { return nil, fmt.Errorf("env: failed to create domain storage: %v", err) } - adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, vrfKeyGen) + mutations, err := mutationstorage.New(db) + if err != nil { + return nil, fmt.Errorf("env: Failed to create mutations object: %v", err) + } + adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, mutations, vrfKeyGen) cctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() domainPB, err := adminSvr.CreateDomain(cctx, &pb.CreateDomainRequest{ @@ -158,10 +162,6 @@ func NewEnv(ctx context.Context) (*Env, error) { glog.V(5).Infof("Domain: %# v", pretty.Formatter(domainPB)) // Common data structures. - mutations, err := mutationstorage.New(db) - if err != nil { - return nil, fmt.Errorf("env: Failed to create mutations object: %v", err) - } authFunc := authentication.FakeAuthFunc authz := &authorization.AuthzPolicy{} From 2b398b2ef76f091e3721d190dad17552d737ee59 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Fri, 5 Oct 2018 13:58:51 +0100 Subject: [PATCH 07/10] SQL independent randomShard --- impl/sql/mutationstorage/queue.go | 48 +++++++++++-------------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 458062028..39aff303b 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -43,9 +43,24 @@ func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ... return nil } +// Send writes mutations to the leading edge (by sequence number) of the mutations table. +func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryUpdate) error { + glog.Infof("mutationstorage: Send(%v, )", domainID) + shardID, err := m.randShard(ctx, domainID) + if err != nil { + return err + } + mData, err := proto.Marshal(update) + if err != nil { + return err + } + // TODO(gbelvin): Implement retry with backoff for retryable errors if + // we get timestamp contention. + return m.send(ctx, domainID, shardID, mData, time.Now()) +} + // randShard returns a random, enabled shard for domainID. func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, error) { - // Read all enabled shards for domainID. // TODO(gbelvin): Cache these results. var shardIDs []int64 rows, err := m.db.QueryContext(ctx, @@ -72,37 +87,6 @@ func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, erro return shardIDs[rand.Intn(len(shardIDs))], nil } -// Send writes mutations to the leading edge (by sequence number) of the mutations table. -func (m *Mutations) Send(ctx context.Context, domainID string, update *pb.EntryUpdate) error { - glog.Infof("mutationstorage: Send(%v, )", domainID) - shardID, err := m.randomShard(ctx, domainID) - if err != nil { - return err - } - mData, err := proto.Marshal(update) - if err != nil { - return err - } - // TODO(gbelvin): Implement retry with backoff for retryable errors if - // we get timestamp contention. - return m.send(ctx, domainID, shardID, mData, time.Now()) -} - -// randomShard returns a random shard from the list of active shards for domainID. -func (m *Mutations) randomShard(ctx context.Context, domainID string) (int64, error) { - var shardID int64 - err := m.db.QueryRowContext(ctx, - `SELECT ShardID from Shards WHERE DomainID = ? ORDER BY RANDOM() LIMIT 1;`, - domainID).Scan(&shardID) - switch { - case err == sql.ErrNoRows: - return 0, status.Errorf(codes.NotFound, "No shard found for domain %v", domainID) - case err != nil: - return 0, err - } - return shardID, nil -} - // ts must be greater than all other timestamps currently recorded for domainID. func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mData []byte, ts time.Time) error { tx, err := m.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) From 6ebf36cc8973dfd7448c963d14f055ac4149c45a Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Fri, 5 Oct 2018 14:12:05 +0100 Subject: [PATCH 08/10] Use constant for repeated string --- impl/sql/mutationstorage/queue_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 347b4e86e..91719955c 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -34,7 +34,6 @@ func TestRandShard(t *testing.T) { if err != nil { t.Fatalf("Failed to create Mutations: %v", err) } - domainID := "foo" for _, tc := range []struct { desc string @@ -78,7 +77,6 @@ func TestSend(t *testing.T) { if err != nil { t.Fatalf("Failed to create Mutations: %v", err) } - domainID := "foo" update := []byte("bar") ts1 := time.Now() ts2 := ts1.Add(time.Duration(1)) @@ -115,7 +113,6 @@ func TestWatermark(t *testing.T) { if err != nil { t.Fatalf("Failed to create Mutations: %v", err) } - domainID := "foo" ts1 := time.Now() ts2 := ts1.Add(time.Duration(1)) @@ -142,7 +139,7 @@ func TestWatermark(t *testing.T) { }, } { for shardID, ts := range tc.send { - if err := m.send(ctx, domainID, shardID, []byte("foo"), ts); err != nil { + if err := m.send(ctx, domainID, shardID, []byte("mutation"), ts); err != nil { t.Fatalf("send(%v, %v): %v", shardID, ts, err) } } @@ -163,7 +160,6 @@ func TestReadQueue(t *testing.T) { if err != nil { t.Fatalf("Failed to create mutations: %v", err) } - domainID := "readqueue" shardID := int64(5) if err := m.AddShards(ctx, domainID, shardID); err != nil { t.Fatalf("AddShards(): %v", err) From b48a1c1cc307275a74c5879ee6a16c1d131a0991 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Thu, 11 Oct 2018 16:20:08 +0100 Subject: [PATCH 09/10] Cleanup comments --- core/adminserver/admin_server.go | 5 +++-- core/adminserver/admin_server_test.go | 8 ++++---- core/sequencer/sequencer_api.proto | 9 +++++---- core/sequencer/server.go | 7 ++++--- impl/sql/mutationstorage/queue.go | 13 ++++++++----- impl/sql/mutationstorage/queue_test.go | 14 ++++++-------- 6 files changed, 30 insertions(+), 26 deletions(-) diff --git a/core/adminserver/admin_server.go b/core/adminserver/admin_server.go index 0e1aca980..bd4cf7dce 100644 --- a/core/adminserver/admin_server.go +++ b/core/adminserver/admin_server.go @@ -271,8 +271,9 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) ( return nil, fmt.Errorf("adminserver: domains.Write(): %v", err) } - // Create shards for queue. - shardIDs := []int64{1} + // Create initial shards for queue. + // TODO(#1063): Additional shards can be added at a later point to support increased server load. + shardIDs := []int64{1, 2} if err := s.queueAdmin.AddShards(ctx, in.GetDomainId(), shardIDs...); err != nil { return nil, fmt.Errorf("adminserver: AddShards(%v): %v", shardIDs, err) } diff --git a/core/adminserver/admin_server_test.go b/core/adminserver/admin_server_test.go index 6e70234bf..b3f09a42e 100644 --- a/core/adminserver/admin_server_test.go +++ b/core/adminserver/admin_server_test.go @@ -87,7 +87,7 @@ func (e *miniEnv) Close() { type fakeQueueAdmin struct{} -func (*fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { +func (fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { return nil } @@ -177,7 +177,7 @@ func TestCreateRead(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainID string @@ -229,7 +229,7 @@ func TestDelete(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainID string @@ -293,7 +293,7 @@ func TestListDomains(t *testing.T) { t.Fatalf("Failed to create trillian log server: %v", err) } - svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen) + svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen) for _, tc := range []struct { domainIDs []string diff --git a/core/sequencer/sequencer_api.proto b/core/sequencer/sequencer_api.proto index 210d006f5..10b4dd6f6 100644 --- a/core/sequencer/sequencer_api.proto +++ b/core/sequencer/sequencer_api.proto @@ -27,9 +27,9 @@ import "google/protobuf/empty.proto"; message MapMetadata { // SourceSlice is the range of inputs that have been included in a map revision. message SourceSlice { - // lowest_watermark is the lowest primary key (exclusive) the source log - // that has been incorporated into this map revision. The primary keys of - // logged items MUST be monotonically increasing. + // lowest_watermark is the lowest primary key (exclusive) of the source + // log that has been incorporated into this map revision. The primary + // keys of logged items MUST be monotonically increasing. int64 lowest_watermark = 1; // highest_watermark is the highest primary key (inclusive) of the source // log that has been incorporated into this map revision. The primary keys @@ -37,7 +37,8 @@ message MapMetadata { int64 highest_watermark = 2; } reserved 1; - // sources defines the ranges of inputs used for this map revision for each slice. + // sources is a map from log source IDs to the (low, high] range of primary keys + // in each slice that were used to construct this map revision. map sources = 2; } diff --git a/core/sequencer/server.go b/core/sequencer/server.go index bbcf74a1f..8a8ff5993 100644 --- a/core/sequencer/server.go +++ b/core/sequencer/server.go @@ -70,10 +70,11 @@ func createMetrics(mf monitoring.MetricFactory) { // Queue reads messages that haven't been deleted off the queue. type Queue interface { - // HighWatermark returns the highest timestamp in the mutations table. + // HighWatermarks returns the highest primary key for each shard in the mutations table. HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) - // Read returns up to batchSize messages for domainID. - ReadQueue(ctx context.Context, domainID string, shard, low, high int64) ([]*mutator.QueueMessage, error) + // ReadQueue returns the messages under shardID in the (low, high] range. + // ReadQueue does NOT delete messages + ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) } // Server implements KeyTransparencySequencerServer. diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 39aff303b..25aa5a15d 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -20,13 +20,12 @@ import ( "math/rand" "time" + "github.com/golang/glog" + "github.com/golang/protobuf/proto" "github.com/google/keytransparency/core/mutator" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/golang/glog" - "github.com/golang/protobuf/proto" - pb "github.com/google/keytransparency/core/api/v1/keytransparency_go_proto" ) @@ -34,6 +33,9 @@ import ( func (m *Mutations) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error { glog.Infof("mutationstorage: AddShard(%v, %v)", domainID, shardIDs) for _, shardID := range shardIDs { + // TODO(gdbelvin): Use INSERT IGNORE to allow this function to be retried. + // TODO(gdbelvin): Migrate to a MySQL Docker image for unit tests. + // MySQL and SQLite do not have the same syntax for INSERT IGNORE. if _, err := m.db.ExecContext(ctx, `INSERT INTO Shards (DomainID, ShardID, Enabled) Values(?, ?, ?);`, domainID, shardID, true); err != nil { @@ -69,6 +71,7 @@ func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, erro if err != nil { return 0, err } + defer rows.Close() for rows.Next() { var shardID int64 if err := rows.Scan(&shardID); err != nil { @@ -126,7 +129,7 @@ func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mD return tx.Commit() } -// HighWatermarks returns the highest timestamp in the mutations table. +// HighWatermarks returns the highest timestamp for each shard in the mutations table. func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) { watermarks := make(map[int64]int64) rows, err := m.db.QueryContext(ctx, @@ -149,7 +152,7 @@ func (m *Mutations) HighWatermarks(ctx context.Context, domainID string) (map[in return watermarks, nil } -// ReadQueue reads all mutations that are still in the queue. +// ReadQueue reads all mutations in shardID between (low, high]. func (m *Mutations) ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) { rows, err := m.db.QueryContext(ctx, diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 91719955c..7d86c35f7 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -37,24 +37,22 @@ func TestRandShard(t *testing.T) { for _, tc := range []struct { desc string - send bool + send []int64 wantCode codes.Code wantShards map[int64]bool }{ {desc: "no rows", wantCode: codes.NotFound, wantShards: map[int64]bool{}}, - {desc: "second", send: true, wantShards: map[int64]bool{ + {desc: "second", send: []int64{1, 2, 3}, wantShards: map[int64]bool{ 1: true, 2: true, 3: true, }}, } { - if tc.send { - if err := m.AddShards(ctx, domainID, 1, 2, 3); err != nil { - t.Fatalf("AddShards(): %v", err) - } + if err := m.AddShards(ctx, domainID, tc.send...); err != nil { + t.Fatalf("AddShards(): %v", err) } shards := make(map[int64]bool) - for i := 0; i < 20; i++ { + for i := 0; i < 10*len(tc.wantShards); i++ { shard, err := m.randShard(ctx, domainID) if got, want := status.Code(err), tc.wantCode; got != want { t.Errorf("randShard(): %v, want %v", got, want) @@ -106,7 +104,7 @@ func TestSend(t *testing.T) { } } -func TestWatermark(t *testing.T) { +func TestWatermarks(t *testing.T) { ctx := context.Background() db := newDB(t) m, err := New(db) From 69e197ccae0ab0cc672caf471b92b017e36c6745 Mon Sep 17 00:00:00 2001 From: Gary Belvin Date: Thu, 11 Oct 2018 18:01:08 +0100 Subject: [PATCH 10/10] Nits and improved error handling --- core/sequencer/server.go | 2 +- impl/sql/mutationstorage/queue.go | 22 +++++------- impl/sql/mutationstorage/queue_test.go | 46 ++++++++++++++------------ 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/core/sequencer/server.go b/core/sequencer/server.go index 8a8ff5993..d46184ddb 100644 --- a/core/sequencer/server.go +++ b/core/sequencer/server.go @@ -73,7 +73,7 @@ type Queue interface { // HighWatermarks returns the highest primary key for each shard in the mutations table. HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error) // ReadQueue returns the messages under shardID in the (low, high] range. - // ReadQueue does NOT delete messages + // ReadQueue does NOT delete messages. ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error) } diff --git a/impl/sql/mutationstorage/queue.go b/impl/sql/mutationstorage/queue.go index 25aa5a15d..565e2d115 100644 --- a/impl/sql/mutationstorage/queue.go +++ b/impl/sql/mutationstorage/queue.go @@ -91,27 +91,27 @@ func (m *Mutations) randShard(ctx context.Context, domainID string) (int64, erro } // ts must be greater than all other timestamps currently recorded for domainID. -func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mData []byte, ts time.Time) error { +func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mData []byte, ts time.Time) (ret error) { tx, err := m.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if err != nil { return err } + defer func() { + if ret != nil { + if err := tx.Rollback(); err != nil { + ret = status.Errorf(codes.Internal, "%v, and could not rollback: %v", ret, err) + } + } + }() var maxTime int64 if err := tx.QueryRowContext(ctx, `SELECT COALESCE(MAX(Time), 0) FROM Queue WHERE DomainID = ? AND ShardID = ?;`, domainID, shardID).Scan(&maxTime); err != nil { - if rollbackErr := tx.Rollback(); rollbackErr != nil { - return status.Errorf(codes.Internal, - "query err: %v and could not roll back: %v", err, rollbackErr) - } - return err + return status.Errorf(codes.Internal, "could not find max timestamp: %v", err) } tsTime := ts.UnixNano() if tsTime <= maxTime { - if rollbackErr := tx.Rollback(); rollbackErr != nil { - return status.Errorf(codes.Internal, "could not roll back: %v", rollbackErr) - } return status.Errorf(codes.Aborted, "current timestamp: %v, want > max-timestamp of queued mutations: %v", tsTime, maxTime) @@ -120,10 +120,6 @@ func (m *Mutations) send(ctx context.Context, domainID string, shardID int64, mD if _, err = tx.ExecContext(ctx, `INSERT INTO Queue (DomainID, ShardID, Time, Mutation) VALUES (?, ?, ?, ?);`, domainID, shardID, tsTime, mData); err != nil { - if rollbackErr := tx.Rollback(); rollbackErr != nil { - return status.Errorf(codes.Internal, - "insert err: %v and could not roll back: %v", err, rollbackErr) - } return status.Errorf(codes.Internal, "failed inserting into queue: %v", err) } return tx.Commit() diff --git a/impl/sql/mutationstorage/queue_test.go b/impl/sql/mutationstorage/queue_test.go index 7d86c35f7..f320f3a3c 100644 --- a/impl/sql/mutationstorage/queue_test.go +++ b/impl/sql/mutationstorage/queue_test.go @@ -29,11 +29,6 @@ import ( func TestRandShard(t *testing.T) { ctx := context.Background() - db := newDB(t) - m, err := New(db) - if err != nil { - t.Fatalf("Failed to create Mutations: %v", err) - } for _, tc := range []struct { desc string @@ -42,29 +37,36 @@ func TestRandShard(t *testing.T) { wantShards map[int64]bool }{ {desc: "no rows", wantCode: codes.NotFound, wantShards: map[int64]bool{}}, + {desc: "one row", send: []int64{10}, wantShards: map[int64]bool{10: true}}, {desc: "second", send: []int64{1, 2, 3}, wantShards: map[int64]bool{ 1: true, 2: true, 3: true, }}, } { - if err := m.AddShards(ctx, domainID, tc.send...); err != nil { - t.Fatalf("AddShards(): %v", err) - } - shards := make(map[int64]bool) - for i := 0; i < 10*len(tc.wantShards); i++ { - shard, err := m.randShard(ctx, domainID) - if got, want := status.Code(err), tc.wantCode; got != want { - t.Errorf("randShard(): %v, want %v", got, want) - } + t.Run(tc.desc, func(t *testing.T) { + m, err := New(newDB(t)) if err != nil { - break + t.Fatalf("Failed to create Mutations: %v", err) } - shards[shard] = true - } - if got, want := shards, tc.wantShards; !cmp.Equal(got, want) { - t.Errorf("shards: %v, want %v", got, want) - } + if err := m.AddShards(ctx, domainID, tc.send...); err != nil { + t.Fatalf("AddShards(): %v", err) + } + shards := make(map[int64]bool) + for i := 0; i < 10*len(tc.wantShards); i++ { + shard, err := m.randShard(ctx, domainID) + if got, want := status.Code(err), tc.wantCode; got != want { + t.Errorf("randShard(): %v, want %v", got, want) + } + if err != nil { + break + } + shards[shard] = true + } + if got, want := shards, tc.wantShards; !cmp.Equal(got, want) { + t.Errorf("shards: %v, want %v", got, want) + } + }) } } @@ -143,10 +145,10 @@ func TestWatermarks(t *testing.T) { } highs, err := m.HighWatermarks(ctx, domainID) if err != nil { - t.Fatalf("HighWatermark(): %v", err) + t.Fatalf("HighWatermarks(): %v", err) } if !cmp.Equal(highs, tc.want) { - t.Errorf("HighWatermark(): %v, want %v", highs, tc.want) + t.Errorf("HighWatermarks(): %v, want %v", highs, tc.want) } } }