Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Scale mutation queue with multiple shards #1048

Merged
merged 11 commits into from
Oct 11, 2018
2 changes: 1 addition & 1 deletion cmd/keytransparency-sequencer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func main() {
keygen := func(ctx context.Context, spec *keyspb.Specification) (proto.Message, error) {
return der.NewProtoFromSpec(spec)
}
adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, keygen)
adminServer := adminserver.New(tlog, tmap, logAdmin, mapAdmin, domainStorage, mutations, keygen)
glog.Infof("Signer starting")

// Run servers
Expand Down
42 changes: 30 additions & 12 deletions core/adminserver/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,21 @@ var (
}
)

// QueueAdmin controls the lifecycle and scaling of mutation queues.
type QueueAdmin interface {
// AddShards creates and adds new shards for queue writing to a domain.
AddShards(ctx context.Context, domainID string, shardIDs ...int64) error
}

// Server implements pb.KeyTransparencyAdminServer
type Server struct {
tlog tpb.TrillianLogClient
tmap tpb.TrillianMapClient
logAdmin tpb.TrillianAdminClient
mapAdmin tpb.TrillianAdminClient
domains domain.Storage
keygen keys.ProtoGenerator
tlog tpb.TrillianLogClient
tmap tpb.TrillianMapClient
logAdmin tpb.TrillianAdminClient
mapAdmin tpb.TrillianAdminClient
domains domain.Storage
queueAdmin QueueAdmin
keygen keys.ProtoGenerator
}

// New returns a KeyTransparencyAdmin implementation.
Expand All @@ -91,15 +98,17 @@ func New(
tmap tpb.TrillianMapClient,
logAdmin, mapAdmin tpb.TrillianAdminClient,
domains domain.Storage,
queueAdmin QueueAdmin,
keygen keys.ProtoGenerator,
) *Server {
return &Server{
tlog: tlog,
tmap: tmap,
logAdmin: logAdmin,
mapAdmin: mapAdmin,
domains: domains,
keygen: keygen,
tlog: tlog,
tmap: tmap,
logAdmin: logAdmin,
mapAdmin: mapAdmin,
domains: domains,
queueAdmin: queueAdmin,
keygen: keygen,
}
}

Expand Down Expand Up @@ -249,6 +258,7 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
err, logTree.TreeId, delLogErr, mapTree.TreeId, delMapErr)
}

// Create domain - {log, map} binding.
if err := s.domains.Write(ctx, &domain.Domain{
DomainID: in.GetDomainId(),
MapID: mapTree.TreeId,
Expand All @@ -260,6 +270,14 @@ func (s *Server) CreateDomain(ctx context.Context, in *pb.CreateDomainRequest) (
}); err != nil {
return nil, fmt.Errorf("adminserver: domains.Write(): %v", err)
}

// Create initial shards for queue.
// TODO(#1063): Additional shards can be added at a later point to support increased server load.
shardIDs := []int64{1, 2}
if err := s.queueAdmin.AddShards(ctx, in.GetDomainId(), shardIDs...); err != nil {
return nil, fmt.Errorf("adminserver: AddShards(%v): %v", shardIDs, err)
}

d := &pb.Domain{
DomainId: in.GetDomainId(),
Log: logTree,
Expand Down
12 changes: 9 additions & 3 deletions core/adminserver/admin_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func (e *miniEnv) Close() {
e.stopMockServer()
}

type fakeQueueAdmin struct{}

func (fakeQueueAdmin) AddShards(ctx context.Context, domainID string, shardIDs ...int64) error {
return nil
}

func TestCreateDomain(t *testing.T) {
for _, tc := range []struct {
desc string
Expand Down Expand Up @@ -171,7 +177,7 @@ func TestCreateRead(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -223,7 +229,7 @@ func TestDelete(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainID string
Expand Down Expand Up @@ -287,7 +293,7 @@ func TestListDomains(t *testing.T) {
t.Fatalf("Failed to create trillian log server: %v", err)
}

svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, vrfKeyGen)
svr := New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, storage, fakeQueueAdmin{}, vrfKeyGen)

for _, tc := range []struct {
domainIDs []string
Expand Down
14 changes: 8 additions & 6 deletions core/sequencer/sequencer_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ package google.keytransparency.sequencer;
import "google/protobuf/empty.proto";

message MapMetadata {
// MapSourceSlice is the range of inputs that have been included in a map revision.
// SourceSlice is the range of inputs that have been included in a map revision.
message SourceSlice {
// lowest_watermark is the lowest primary key (exclusive) the source log
// that has been incorporated into this map revision. The primary keys of
// logged items MUST be monotonically increasing.
// lowest_watermark is the lowest primary key (exclusive) of the source
// log that has been incorporated into this map revision. The primary
// keys of logged items MUST be monotonically increasing.
int64 lowest_watermark = 1;
// highest_watermark is the highest primary key (inclusive) of the source
// log that has been incorporated into this map revision. The primary keys
// of logged items MUST be monotonically increasing.
int64 highest_watermark = 2;
}
// source defines the range of inputs used for this map revision.
SourceSlice source = 1;
reserved 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you at a point where you can't afford breaking compatibility by simply removing this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fortunately we're early enough that we're happy to break compatibility.
This PR is also introducing backwards-incompatible database schema changes as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was my point. Why using reserved 1 instead of just dropping / reassigning the tag to the new field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good habits I suppose :-)

// sources is a map from log source IDs to the (low, high] range of primary keys
// in each slice that were used to construct this map revision.
map<int64, SourceSlice> sources = 2;
}


Expand Down
78 changes: 41 additions & 37 deletions core/sequencer/sequencer_go_proto/sequencer_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 30 additions & 28 deletions core/sequencer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func createMetrics(mf monitoring.MetricFactory) {

// Queue reads messages that haven't been deleted off the queue.
type Queue interface {
// HighWatermark returns the highest primary key in the mutations table for DomainID.
HighWatermark(ctx context.Context, domainID string) (int64, error)
// ReadQueue returns the messages between (low, high] for domainID.
// TODO(gbelvin): Add paging API back in to support sharded reads.
ReadQueue(ctx context.Context, domainID string, low, high int64) ([]*mutator.QueueMessage, error)
// HighWatermarks returns the highest primary key for each shard in the mutations table.
HighWatermarks(ctx context.Context, domainID string) (map[int64]int64, error)
// ReadQueue returns the messages under shardID in the (low, high] range.
// ReadQueue does NOT delete messages
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
ReadQueue(ctx context.Context, domainID string, shardID, low, high int64) ([]*mutator.QueueMessage, error)
}

// Server implements KeyTransparencySequencerServer.
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
if err := proto.Unmarshal(latestMapRoot.Metadata, &lastMeta); err != nil {
return nil, err
}
high, err := s.queue.HighWatermark(ctx, in.DomainId)
highs, err := s.queue.HighWatermarks(ctx, in.DomainId)
if err != nil {
return nil, status.Errorf(codes.Internal, "HighWatermark(): %v", err)
}
Expand All @@ -138,53 +138,55 @@ func (s *Server) RunBatch(ctx context.Context, in *spb.RunBatchRequest) (*empty.
// Count items to be processed. Unfortunately, this means we will be
// reading the items to be processed twice. Once, here in RunBatch and
// again in CreateEpoch.
metadata := &spb.MapMetadata{
Source: &spb.MapMetadata_SourceSlice{
LowestWatermark: lastMeta.GetSource().GetHighestWatermark(),
sources := make(map[int64]*spb.MapMetadata_SourceSlice)
for sliceID, high := range highs {
sources[sliceID] = &spb.MapMetadata_SourceSlice{
LowestWatermark: lastMeta.Sources[sliceID].GetHighestWatermark(),
HighestWatermark: high,
},
}
}

msgs, err := s.readMessages(ctx, in.DomainId, metadata.GetSource())
msgs, err := s.readMessages(ctx, in.DomainId, sources)
if err != nil {
return nil, err
}
if int32(len(msgs)) < in.MinBatch {
if len(msgs) < int(in.MinBatch) {
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
return &empty.Empty{}, nil
}

return s.CreateEpoch(ctx, &spb.CreateEpochRequest{
DomainId: in.DomainId,
Revision: int64(latestMapRoot.Revision) + 1,
MapMetadata: metadata,
MapMetadata: &spb.MapMetadata{Sources: sources},
})
}

func (s *Server) readMessages(ctx context.Context, domainID string,
source *spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) {
// Read mutations
batch, err := s.queue.ReadQueue(ctx, domainID,
source.GetLowestWatermark(), source.GetHighestWatermark())
if err != nil {
return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err)
}
msgs := make([]*ktpb.EntryUpdate, 0, len(batch))
for _, m := range batch {
msgs = append(msgs, &ktpb.EntryUpdate{
Mutation: m.Mutation,
Committed: m.ExtraData,
})
sources map[int64]*spb.MapMetadata_SourceSlice) ([]*ktpb.EntryUpdate, error) {
msgs := make([]*ktpb.EntryUpdate, 0)
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
for shardID, source := range sources {
batch, err := s.queue.ReadQueue(ctx, domainID, shardID,
source.GetLowestWatermark(), source.GetHighestWatermark())
if err != nil {
return nil, status.Errorf(codes.Internal, "ReadQueue(): %v", err)
}
for _, m := range batch {
msgs = append(msgs, &ktpb.EntryUpdate{
Mutation: m.Mutation,
Committed: m.ExtraData,
})
}
}
return msgs, nil
}

// CreateEpoch applies the supplied mutations to the current map revision and creates a new epoch.
func (s *Server) CreateEpoch(ctx context.Context, in *spb.CreateEpochRequest) (*empty.Empty, error) {
domainID := in.GetDomainId()
if in.MapMetadata.GetSource() == nil {
if in.MapMetadata.GetSources() == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing map metadata")
}
msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSource())
msgs, err := s.readMessages(ctx, in.DomainId, in.MapMetadata.GetSources())
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions impl/integration/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func NewEnv(ctx context.Context) (*Env, error) {
if err != nil {
return nil, fmt.Errorf("env: failed to create domain storage: %v", err)
}
adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, vrfKeyGen)
mutations, err := mutationstorage.New(db)
if err != nil {
return nil, fmt.Errorf("env: Failed to create mutations object: %v", err)
}
adminSvr := adminserver.New(logEnv.Log, mapEnv.Map, logEnv.Admin, mapEnv.Admin, domainStorage, mutations, vrfKeyGen)
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
domainPB, err := adminSvr.CreateDomain(cctx, &pb.CreateDomainRequest{
Expand All @@ -158,10 +162,6 @@ func NewEnv(ctx context.Context) (*Env, error) {
glog.V(5).Infof("Domain: %# v", pretty.Formatter(domainPB))

// Common data structures.
mutations, err := mutationstorage.New(db)
if err != nil {
return nil, fmt.Errorf("env: Failed to create mutations object: %v", err)
}
authFunc := authentication.FakeAuthFunc
authz := &authorization.AuthzPolicy{}

Expand Down
Loading