Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Move the responsibility to pick an input log from storage to the frontend #1376

Merged
merged 5 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/integration/storagetest/mutation_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (mutationLogsTests) TestReadLog(ctx context.Context, t *testing.T, newForTe
// Write ten batches, three entries each.
for i := byte(0); i < 10; i++ {
entry := &pb.EntryUpdate{Mutation: &pb.SignedEntry{Entry: mustMarshal(t, &pb.Entry{Index: []byte{i}})}}
if _, _, err := m.Send(ctx, directoryID, entry, entry, entry); err != nil {
if _, err := m.Send(ctx, directoryID, logID, entry, entry, entry); err != nil {
t.Fatalf("Send(): %v", err)
}
}
Expand Down
30 changes: 25 additions & 5 deletions core/keyserver/keyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package keyserver
import (
"context"
"fmt"
"math/rand"
"runtime"
"sync"
"time"
Expand Down Expand Up @@ -67,16 +68,18 @@ func createMetrics(mf monitoring.MetricFactory) {

// MutationLogs provides sets of time ordered message logs.
type MutationLogs interface {
// Send submits the whole group of mutations atomically to a random log.
// Send submits the whole group of mutations atomically to a given log.
// TODO(gbelvin): Create a batch level object to make it clear that this a batch of updates.
// Returns the logID and timestamp that the mutation batch got written at.
Send(ctx context.Context, directoryID string, mutation ...*pb.EntryUpdate) (int64, time.Time, error)
// Returns the timestamp that the mutation batch got written at.
Send(ctx context.Context, directoryID string, logID int64, mutation ...*pb.EntryUpdate) (time.Time, error)
// ReadLog returns the messages in the (low, high] range stored in the
// specified log. ReadLog always returns complete units of the original
// batches sent via Send, and will return more items than limit if
// needed to do so.
ReadLog(ctx context.Context, directoryID string, logID int64, low, high time.Time,
limit int32) ([]*mutator.LogMessage, error)
// ListLogs returns a list of logs, optionally filtered by the writable bit.
ListLogs(ctx context.Context, directoryID string, writable bool) ([]int64, error)
}

// BatchReader reads batch definitions.
Expand Down Expand Up @@ -655,8 +658,13 @@ func (s *Server) BatchQueueUserUpdate(ctx context.Context, in *pb.BatchQueueUser
}
tdone()

// Save mutation to the database.
wmLogID, wmTime, err := s.logs.Send(ctx, directory.DirectoryID, in.Updates...)
// Pick a random logID. Note, this effectively picks a random QoS. See issue #1377.
// TODO(gbelvin): Define an explicit QoS / Load ballancing API.
wmLogID, err := s.randLog(ctx, directory.DirectoryID)
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
if st := status.Convert(err); st.Code() != codes.OK {
return nil, status.Errorf(st.Code(), "Could not pick a log to write to: %v", err)
}
wmTime, err := s.logs.Send(ctx, directory.DirectoryID, wmLogID, in.Updates...)
if st := status.Convert(err); st.Code() != codes.OK {
glog.Errorf("mutations.Write failed: %v", err)
return nil, status.Errorf(st.Code(), "Mutation write error")
Expand All @@ -666,6 +674,18 @@ func (s *Server) BatchQueueUserUpdate(ctx context.Context, in *pb.BatchQueueUser
return &empty.Empty{}, nil
}

func (s *Server) randLog(ctx context.Context, directoryID string) (int64, error) {
// TODO(gbelvin): Cache these results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How badly does this read harm the write performance?

writable := true
logIDs, err := s.logs.ListLogs(ctx, directoryID, writable)
mhutchinson marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return 0, err
}

// Return a random log.
return logIDs[rand.Intn(len(logIDs))], nil
}

// GetDirectory returns all info tied to the specified directory.
//
// This API to get all necessary data needed to verify a particular
Expand Down
12 changes: 10 additions & 2 deletions core/keyserver/revisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,16 @@ func (b batchStorage) ReadBatch(ctx context.Context, dirID string, rev int64) (*

type mutations map[int64][]*mutator.LogMessage // Map of logID to Slice of LogMessages

func (m *mutations) Send(ctx context.Context, dirID string, mutation ...*pb.EntryUpdate) (int64, time.Time, error) {
return 0, time.Time{}, errors.New("unimplemented")
func (m *mutations) Send(ctx context.Context, dirID string, _ int64, mutation ...*pb.EntryUpdate) (time.Time, error) {
return time.Time{}, errors.New("unimplemented")
}

func (m *mutations) ListLogs(ctx context.Context, dirID string, _ bool) ([]int64, error) {
logIDs := []int64{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could pre-allocate len(*m) items here.

for id := range *m {
logIDs = append(logIDs, id)
}
return logIDs, nil
}

func (m *mutations) ReadLog(ctx context.Context, dirID string,
Expand Down
28 changes: 5 additions & 23 deletions impl/sql/mutationstorage/mutation_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mutationstorage
import (
"context"
"database/sql"
"math/rand"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -66,30 +65,26 @@ func (m *Mutations) AddLogs(ctx context.Context, directoryID string, logIDs ...i
// Send writes mutations to the leading edge (by sequence number) of the mutations table.
// Returns the logID/watermark pair that was written, or nil if nothing was written.
// TODO(gbelvin): Make updates a slice.
func (m *Mutations) Send(ctx context.Context, directoryID string, updates ...*pb.EntryUpdate) (int64, time.Time, error) {
func (m *Mutations) Send(ctx context.Context, directoryID string, logID int64, updates ...*pb.EntryUpdate) (time.Time, error) {
glog.Infof("mutationstorage: Send(%v, <mutation>)", directoryID)
if len(updates) == 0 {
return 0, time.Time{}, nil
}
logID, err := m.randLog(ctx, directoryID)
if err != nil {
return 0, time.Time{}, err
return time.Time{}, nil
}
updateData := make([][]byte, 0, len(updates))
for _, u := range updates {
data, err := proto.Marshal(u)
if err != nil {
return 0, time.Time{}, err
return time.Time{}, err
}
updateData = append(updateData, data)
}
// TODO(gbelvin): Implement retry with backoff for retryable errors if
// we get timestamp contention.
ts := time.Now()
if err := m.send(ctx, ts, directoryID, logID, updateData...); err != nil {
return 0, time.Time{}, err
return time.Time{}, err
}
return logID, ts, nil
return ts, nil
}

// ListLogs returns a list of all logs for directoryID, optionally filtered for writable logs.
Expand Down Expand Up @@ -123,19 +118,6 @@ func (m *Mutations) ListLogs(ctx context.Context, directoryID string, writable b
return logIDs, nil
}

// randLog returns a random, enabled log for directoryID.
func (m *Mutations) randLog(ctx context.Context, directoryID string) (int64, error) {
gdbelvin marked this conversation as resolved.
Show resolved Hide resolved
// TODO(gbelvin): Cache these results.
writable := true
logIDs, err := m.ListLogs(ctx, directoryID, writable)
if err != nil {
return 0, err
}

// Return a random log.
return logIDs[rand.Intn(len(logIDs))], nil
}

// ts must be greater than all other timestamps currently recorded for directoryID.
func (m *Mutations) send(ctx context.Context, ts time.Time, directoryID string,
logID int64, mData ...[]byte) (ret error) {
Expand Down
42 changes: 1 addition & 41 deletions impl/sql/mutationstorage/mutation_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/keytransparency/core/adminserver"
"github.com/google/keytransparency/core/integration/storagetest"
"github.com/google/keytransparency/core/keyserver"
Expand Down Expand Up @@ -57,45 +56,6 @@ func TestLogsAdminIntegration(t *testing.T) {
})
}

func TestRandLog(t *testing.T) {
mhutchinson marked this conversation as resolved.
Show resolved Hide resolved
ctx := context.Background()
directoryID := "TestRandLog"

for _, tc := range []struct {
desc string
send []int64
wantCode codes.Code
wantLogs map[int64]bool
}{
{desc: "no rows", wantCode: codes.NotFound, wantLogs: map[int64]bool{}},
{desc: "one row", send: []int64{10}, wantLogs: map[int64]bool{10: true}},
{desc: "second", send: []int64{1, 2, 3}, wantLogs: map[int64]bool{
1: true,
2: true,
3: true,
}},
} {
t.Run(tc.desc, func(t *testing.T) {
m, done := newForTest(ctx, t, directoryID, tc.send...)
defer done(ctx)
logs := make(map[int64]bool)
for i := 0; i < 10*len(tc.wantLogs); i++ {
logID, err := m.randLog(ctx, directoryID)
if got, want := status.Code(err), tc.wantCode; got != want {
t.Errorf("randLog(): %v, want %v", got, want)
}
if err != nil {
break
}
logs[logID] = true
}
if got, want := logs, tc.wantLogs; !cmp.Equal(got, want) {
t.Errorf("logs: %v, want %v", got, want)
}
})
}
}

func BenchmarkSend(b *testing.B) {
ctx := context.Background()
directoryID := "BenchmarkSend"
Expand Down Expand Up @@ -123,7 +83,7 @@ func BenchmarkSend(b *testing.B) {
updates = append(updates, update)
}
for n := 0; n < b.N; n++ {
if _, _, err := m.Send(ctx, directoryID, updates...); err != nil {
if _, err := m.Send(ctx, directoryID, logID, updates...); err != nil {
b.Errorf("Send(): %v", err)
}
}
Expand Down