Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Scale mutation queue with multiple shards #1048

Merged
merged 11 commits into from
Oct 11, 2018
2 changes: 1 addition & 1 deletion cmd/keytransparency-sequencer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func main() {
keygen := func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) {
return der.NewProtoFromSpec(spec)
}
adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, keygen)
adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, mutations, keygen)
glog.Infof("Signer starting")

// Run servers
Expand Down
41 changes: 29 additions & 12 deletions core/adminserver/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,21 @@ var (
}
)

// QueueAdmin controls the lifecycle and scaling of mutation queues.
type QueueAdmin interface {
// AddShards creates and adds new shards for queue writing to a domain.
AddShards(ctx context.Context, domainID string, shardIDs ...int64) error
}

// Server implements pb.KeyTransparencyAdminServer
type Server struct {
tlog tpb.TrillianLogClient
tmap tpb.TrillianMapClient
logAdmin tpb.TrillianAdminClient
mapAdmin tpb.TrillianAdminClient
domains domain.Storage
keygen keys.ProtoGenerator
tlog tpb.TrillianLogClient
tmap tpb.TrillianMapClient
logAdmin tpb.TrillianAdminClient
mapAdmin tpb.TrillianAdminClient
domains domain.Storage
queueAdmin QueueAdmin
keygen keys.ProtoGenerator
}

// New returns a KeyTransparencyAdmin implementation.
Expand All @@ -91,15 +98,17 @@ func New(
tmap tpb.TrillianMapClient,
logAdmin, mapAdmin tpb.TrillianAdminClient,
domains domain.Storage,
queueAdmin QueueAdmin,
keygen keys.ProtoGenerator,
) *Server {
return &Server{
tlog: tlog,
tmap: tmap,
logAdmin: logAdmin,
mapAdmin: mapAdmin,
domains: domains,
keygen: keygen,
tlog: tlog,
tmap: tmap,
logAdmin: logAdmin,
mapAdmin: mapAdmin,
domains: domains,
queueAdmin: queueAdmin,
keygen: keygen,
}
}

Expand Down Expand Up @@ -249,6 +258,7 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
err, logTree.TreeId, delLogErr, mapTree.TreeId, delMapErr)
}

// Create domain - {log, map} binding.
if err := s.domains.Write(ctx, &domain.Domain{
DomainID: in.GetDomainId(),
MapID: mapTree.TreeId,
Expand All @@ -260,6 +270,13 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
}); err != nil {
return nil, fmt.Errorf("adminserver: domains.Write(): %v", err)
}

// Create shards for queue.
shardIDs := []int64{1}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is supposed to create shards if they are more than 1? This looks like a place for some TODO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue #1048

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably meant #1063
(putting it to create a link between github issues)

if err := s.queueAdmin.AddShards(ctx, in.GetDomainId(), shardIDs...); err != nil {
return nil, fmt.Errorf("adminserver: AddShards(%v): %v", shardIDs, err)
}

d := &pb.Domain{
DomainId: in.GetDomainId(),
Log: logTree,
Expand Down
12 changes: 9 additions & 3 deletions core/adminserver/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func (e *miniEnv) Close() {
e.stopMockServer()
}

type fakeQueueAdmin struct{}

func (*fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the receiver simply be taken by value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that would simplify things a bit.

return nil
}

func TestCreateDomain(t *testing.T) {
for _, tc := range []struct {
desc string
Expand Down Expand Up @@ -171,7 +177,7 @@ func TestCreateRead(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -223,7 +229,7 @@ func TestDelete(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -287,7 +293,7 @@ func TestListDomains(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, &fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainIDs []string
Expand Down
7 changes: 4 additions & 3 deletions core/sequencer/sequencer_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package google.keytransparency.sequencer;
import "google/protobuf/empty.proto";

message MapMetadata {
// MapSourceSlice is the range of inputs that have been included in a map revision.
// SourceSlice is the range of inputs that have been included in a map revision.
message SourceSlice {
// lowest_watermark is the lowest primary key (exclusive) the source log
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
// that has been incorporated into this map revision. The primary keys of
Expand All @@ -36,8 +36,9 @@ message MapMetadata {
// of logged items MUST be monotonically increasing.
int64 highest_watermark = 2;
}
// source defines the range of inputs used for this map revision.
SourceSlice source = 1;
reserved 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you at a point where you can't afford breaking compatibility by simply removing this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately we're early enough that we're happy to break compatibility.
This PR is also introducing backwards-incompatible database schema changes as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was my point. Why using reserved 1 instead of just dropping / reassigning the tag to the new field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good habits I suppose :-)

// sources defines the ranges of inputs used for this map revision for each slice.
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
map<int64, SourceSlice> sources = 2;
}


Expand Down
78 changes: 41 additions & 37 deletions core/sequencer/sequencer_go_proto/sequencer_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 29 additions & 28 deletions core/sequencer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ func createMetrics(mf monitoring.MetricFactory) {

// Queue reads messages that haven't been deleted off the queue.
type Queue interface {
// HighWatermark returns the highest primary key in the mutations table for DomainID.
HighWatermark(ctx context.Context, domainID string) (int64, error)
// ReadQueue returns the messages between (low, high] for domainID.
// TODO(gbelvin): Add paging API back in to support sharded reads.
ReadQueue(ctx context.Context, domainID string, low, high int64) ([]*mutator.QueueMessage, error)
// HighWatermark returns the highest timestamp in the mutations table.
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error)
// Read returns up to batchSize messages for domainID.
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ReadQueue delete messages from the queue (so that it's effectively a Dequeue)? If not, do I understand correctly that the same effect is achieved by rolling the timestamp window? But then is there a garbage collection for the queue items somewhere (after the watermark timestamp has gone forward)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this is no longer really a Queue because we will not be deleting messages from the queue ever.
The reason is that we want to keep these messages around in order to replay them into the map in case of failure.

I'll replace Queue with Logs. Would that be clearer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes. How about renaming the ReadQueue method to just Read, and nicely naming the interface (e.g. Logs as you say)? I think that a more verbose interface name like ShardedMutationLog or similar could be fine as well.

ReadQueue(ctx context.Context, domainID string, shard, low, high int64) ([]*mutator.QueueMessage, error)
}

// Server implements KeyTransparencySequencerServer.
Expand Down Expand Up @@ -126,7 +125,7 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
if err := proto.Unmarshal(latestMapRoot.Metadata, &lastMeta); err != nil {
return nil, err
}
high, err := s.queue.HighWatermark(ctx, in.DomainId)
highs, err := s.queue.HighWatermarks(ctx, in.DomainId)
if err != nil {
return nil, status.Errorf(codes.Internal, "HighWatermark(): %v", err)
}
Expand All @@ -138,53 +137,55 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
// Count items to be processed. Unfortunately, this means we will be
// reading the items to be processed twice. Once, here in RunBatch and
// again in CreateEpoch.
metadata := &spb.MapMetadata{
Source: &spb.MapMetadata_SourceSlice{
LowestWatermark: lastMeta.GetSource().GetHighestWatermark(),
sources := make(map[int64]*spb.MapMetadata_SourceSlice)
for sliceID, high := range highs {
sources[sliceID] = &spb.MapMetadata_SourceSlice{
LowestWatermark: lastMeta.Sources[sliceID].GetHighestWatermark(),
HighestWatermark: high,
},
}
}

msgs, err := s.readMessages(ctx, in.DomainId, metadata.GetSource())
msgs, err := s.readMessages(ctx, in.DomainId, sources)
if err != nil {
return nil, err
}
if int32(len(msgs)) < in.MinBatch {
if len(msgs) < int(in.MinBatch) {
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
return &empty.Empty{}, nil
}

return s.CreateEpoch(ctx, &spb.CreateEpochRequest{
DomainId: in.DomainId,
Revision: int64(latestMapRoot.Revision) + 1,
MapMetadata: metadata,
MapMetadata: &spb.MapMetadata{Sources: sources},
})
}

func (s *Server) readMessages(ctx context.Context, domainID string,
source *spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) {
// Read mutations
batch, err := s.queue.ReadQueue(ctx, domainID,
source.GetLowestWatermark(), source.GetHighestWatermark())
if err != nil {
return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err)
}
msgs := make([]*ktpb.EntryUpdate, 0, len(batch))
for _, m := range batch {
msgs = append(msgs, &ktpb.EntryUpdate{
Mutation: m.Mutation,
Committed: m.ExtraData,
})
sources map[int64]*spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) {
msgs := make([]*ktpb.EntryUpdate, 0)
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
for shardID, source := range sources {
batch, err := s.queue.ReadQueue(ctx, domainID, shardID,
source.GetLowestWatermark(), source.GetHighestWatermark())
if err != nil {
return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err)
}
for _, m := range batch {
msgs = append(msgs, &ktpb.EntryUpdate{
Mutation: m.Mutation,
Committed: m.ExtraData,
})
}
}
return msgs, nil
}

// CreateEpoch applies the supplied mutations to the current map revision and creates a new epoch.
func (s *Server) CreateEpoch(ctx context.Context, in *spb.CreateEpochRequest) (*empty.Empty, error) {
domainID := in.GetDomainId()
if in.MapMetadata.GetSource() == nil {
if in.MapMetadata.GetSources() == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing map metadata")
}
msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSource())
msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSources())
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions impl/integration/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func NewEnv(ctx context.Context) (*Env, error) {
if err != nil {
return nil, fmt.Errorf("env: failed to create domain storage: %v", err)
}
adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, vrfKeyGen)
mutations, err := mutationstorage.New(db)
if err != nil {
return nil, fmt.Errorf("env: Failed to create mutations object: %v", err)
}
adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, mutations, vrfKeyGen)
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
domainPB, err := adminSvr.CreateDomain(cctx, &pb.CreateDomainRequest{
Expand All @@ -158,10 +162,6 @@ func NewEnv(ctx context.Context) (*Env, error) {
glog.V(5).Infof("Domain: %# v", pretty.Formatter(domainPB))

// Common data structures.
mutations, err := mutationstorage.New(db)
if err != nil {
return nil, fmt.Errorf("env: Failed to create mutations object: %v", err)
}
authFunc := authentication.FakeAuthFunc
authz := &authorization.AuthzPolicy{}

Expand Down
9 changes: 8 additions & 1 deletion impl/sql/mutationstorage/mutations.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ var (
);`,
`CREATE TABLE IF NOT EXISTS Queue (
DomainID VARCHAR(30) NOT NULL,
ShardID BIGINT NOT NULL,
Time BIGINT NOT NULL,
Mutation BLOB NOT NULL,
PRIMARY KEY(DomainID, Time)
PRIMARY KEY(DomainID, ShardID, Time)
);`,
`CREATE TABLE IF NOT EXISTS Shards (
DomainID VARCHAR(30) NOT NULL,
ShardID BIGINT NOT NULL,
Enabled INTEGER NOT NULL,
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
PRIMARY KEY(DomainID, ShardID)
);`,
}
)
Expand Down
Loading