diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index 5413130b..9c926222 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -83,7 +83,7 @@ func main() { defer r.Body.Close() id := sha256.Sum256(b) - idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:]))) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:])))() if err != nil { if errors.Is(err, tessera.ErrPushback) { w.Header().Add("Retry-After", "1") diff --git a/cmd/example-mysql/main.go b/cmd/example-mysql/main.go index 95d38264..44f7e2d8 100644 --- a/cmd/example-mysql/main.go +++ b/cmd/example-mysql/main.go @@ -92,7 +92,7 @@ func main() { klog.Warningf("/add: %v", err) } }() - idx, err := storage.Add(r.Context(), tessera.NewEntry(b)) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b))() if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) diff --git a/cmd/example-posix/main.go b/cmd/example-posix/main.go index c6b20077..fa9d9b79 100644 --- a/cmd/example-posix/main.go +++ b/cmd/example-posix/main.go @@ -142,7 +142,7 @@ func main() { klog.Warningf("/add: %v", err) } }() - idx, err := storage.Add(r.Context(), tessera.NewEntry(b)) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b))() if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) diff --git a/cmd/posix-oneshot/main.go b/cmd/posix-oneshot/main.go index 287ad5b6..ee865e39 100644 --- a/cmd/posix-oneshot/main.go +++ b/cmd/posix-oneshot/main.go @@ -25,7 +25,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "time" "golang.org/x/mod/sumdb/note" @@ -139,7 +138,7 @@ func main() { } return cp.Size, cp.Hash, nil } - st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(256, time.Second)) + st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(uint(len(toAdd)), time.Second)) // sequence entries @@ -149,41 +148,28 @@ func main() { // sequence numbers assigned to the data from the provided input files. type entryInfo struct { name string - e *tessera.Entry + f tessera.IndexFuture } entryChan := make(chan entryInfo, 100) - go func() { - for _, fp := range toAdd { - b, err := os.ReadFile(fp) - if err != nil { - klog.Exitf("Failed to read entry file %q: %q", fp, err) - } - entryChan <- entryInfo{name: fp, e: tessera.NewEntry(b)} + for _, fp := range toAdd { + b, err := os.ReadFile(fp) + if err != nil { + klog.Exitf("Failed to read entry file %q: %q", fp, err) } - close(entryChan) - }() - numWorkers := 256 - if l := len(toAdd); l < numWorkers { - numWorkers = l + // ask storage to sequence, we'll put the future we get back into the entryChan for later... + f := st.Add(ctx, tessera.NewEntry(b)) + entryChan <- entryInfo{name: fp, f: f} } + close(entryChan) - wg := sync.WaitGroup{} - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for entry := range entryChan { - // ask storage to sequence - seq, err := st.Add(context.Background(), entry.e) - if err != nil { - klog.Exitf("failed to sequence %q: %q", entry.name, err) - } - klog.Infof("%d: %v", seq, entry.name) - } - }() + for entry := range entryChan { + seq, err := entry.f() + if err != nil { + klog.Exitf("failed to sequence %q: %q", entry.name, err) + } + klog.Infof("%d: %v", seq, entry.name) } - wg.Wait() } func getKeyFile(path string) (string, error) { diff --git a/ct_only.go b/ct_only.go index 9fa8eb53..43a201b8 100644 --- a/ct_only.go +++ b/ct_only.go @@ -24,10 +24,10 @@ import ( // Storage described the expected functions from Tessera storage implementations. type Storage interface { - // Add should duably assign an index to the provided Entry, and return it. + // Add should duably assign an index to the provided Entry, returning a future to access that value. // // Implementations MUST call MarshalBundleData method on the entry before persisting/integrating it. - Add(context.Context, *Entry) (uint64, error) + Add(context.Context, *Entry) IndexFuture } // NewCertificateTransparencySequencedWriter returns a function which knows how to add a CT-specific entry type to the log. @@ -38,8 +38,8 @@ type Storage interface { // b) is not compatible with the https://c2sp.org/tlog-tiles API which we _very strongly_ encourage you to use instead. // // Returns the assigned index in the log, or an error. -func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) (uint64, error) { - return func(ctx context.Context, e *ctonly.Entry) (uint64, error) { +func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) IndexFuture { + return func(ctx context.Context, e *ctonly.Entry) IndexFuture { return s.Add(ctx, convertCTEntry(e)) } } diff --git a/integration/storage_uniformity_test.go b/integration/storage_uniformity_test.go index 1f776ae3..fa59d50a 100644 --- a/integration/storage_uniformity_test.go +++ b/integration/storage_uniformity_test.go @@ -24,7 +24,7 @@ import ( ) type StorageContract interface { - Add(ctx context.Context, entry *tessera.Entry) (uint64, error) + Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture } var ( diff --git a/log.go b/log.go index bb2c3088..37a1d35f 100644 --- a/log.go +++ b/log.go @@ -45,6 +45,12 @@ type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error) // EntriesPathFunc is the signature of a function which knows how to format entry bundle paths. type EntriesPathFunc func(n, logSize uint64) string +// IndexFuture is the signature of a function which can return an assigned index or error. +// +// Implementations of this func are likely to be "futures", or a promise to return this data at +// some point in the future, and as such will block when called if the data isn't yet available. +type IndexFuture func() (uint64, error) + // StorageOptions holds optional settings for all storage implementations. type StorageOptions struct { NewCP NewCPFunc diff --git a/personalities/sctfe/handlers.go b/personalities/sctfe/handlers.go index 6d1158c5..0fcd9ae3 100644 --- a/personalities/sctfe/handlers.go +++ b/personalities/sctfe/handlers.go @@ -334,7 +334,7 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r } klog.V(2).Infof("%s: %s => storage.Add", li.LogOrigin, method) - idx, err := li.storage.Add(ctx, entry) + idx, err := li.storage.Add(ctx, entry)() if err != nil { if errors.Is(err, tessera.ErrPushback) { w.Header().Add("Retry-After", "1") diff --git a/personalities/sctfe/handlers_test.go b/personalities/sctfe/handlers_test.go index cca6f1ff..fddb747d 100644 --- a/personalities/sctfe/handlers_test.go +++ b/personalities/sctfe/handlers_test.go @@ -294,7 +294,7 @@ func TestAddChainWhitespace(t *testing.T) { t.Run(test.descr, func(t *testing.T) { if test.want == http.StatusOK { info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, nil) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, nil }) } recorder := httptest.NewRecorder() @@ -368,7 +368,7 @@ func TestAddChain(t *testing.T) { req, leafChain := parseChain(t, false, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) } recorder := makeAddChainRequest(t, info.li, chain) @@ -457,7 +457,7 @@ func TestAddPrechain(t *testing.T) { req, leafChain := parseChain(t, true, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) } recorder := makeAddPrechainRequest(t, info.li, chain) diff --git a/personalities/sctfe/mockstorage/mock_ct_storage.go b/personalities/sctfe/mockstorage/mock_ct_storage.go index 373556f0..75509fd8 100644 --- a/personalities/sctfe/mockstorage/mock_ct_storage.go +++ b/personalities/sctfe/mockstorage/mock_ct_storage.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" x509 "github.com/google/certificate-transparency-go/x509" + tessera "github.com/transparency-dev/trillian-tessera" ctonly "github.com/transparency-dev/trillian-tessera/ctonly" ) @@ -37,12 +38,11 @@ func (m *MockStorage) EXPECT() *MockStorageMockRecorder { } // Add mocks base method. -func (m *MockStorage) Add(arg0 context.Context, arg1 *ctonly.Entry) (uint64, error) { +func (m *MockStorage) Add(arg0 context.Context, arg1 *ctonly.Entry) tessera.IndexFuture { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Add", arg0, arg1) - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(tessera.IndexFuture) + return ret0 } // Add indicates an expected call of Add. diff --git a/personalities/sctfe/storage.go b/personalities/sctfe/storage.go index b204d68b..9bd3d75c 100644 --- a/personalities/sctfe/storage.go +++ b/personalities/sctfe/storage.go @@ -36,8 +36,8 @@ const ( // Storage provides all the storage primitives necessary to write to a ct-static-api log. type Storage interface { - // Add assigns an index to the provided Entry, stages the entry for integration, and return it the assigned index. - Add(context.Context, *ctonly.Entry) (uint64, error) + // Add assigns an index to the provided Entry, stages the entry for integration, and returns a future for the assigned index. + Add(context.Context, *ctonly.Entry) tessera.IndexFuture // AddIssuerChain stores every the chain certificate in a content-addressable store under their sha256 hash. AddIssuerChain(context.Context, []*x509.Certificate) error } @@ -54,7 +54,7 @@ type IssuerStorage interface { // CTStorage implements Storage. type CTStorage struct { - storeData func(context.Context, *ctonly.Entry) (uint64, error) + storeData func(context.Context, *ctonly.Entry) tessera.IndexFuture storeIssuers func(context.Context, []KV) error } @@ -68,7 +68,7 @@ func NewCTSTorage(logStorage tessera.Storage, issuerStorage IssuerStorage) (*CTS } // Add stores CT entries. -func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) (uint64, error) { +func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) tessera.IndexFuture { // TODO(phboneff): add deduplication and chain storage return cts.storeData(ctx, entry) } diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 635c6ae7..b12ff5f5 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -166,8 +166,8 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) } // Add is the entrypoint for adding entries to a sequencing log. -func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, e)() +func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, e) } // Get returns the requested object. diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index b8b1285f..5a31faee 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -216,8 +216,8 @@ func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64 } // Add is the entrypoint for adding entries to a sequencing log. -func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, entry)() +func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, entry) } // sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 2f07a204..9f3a0b1b 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -249,7 +249,7 @@ func TestParallelAdd(t *testing.T) { t.Run(test.name, func(t *testing.T) { for i := 0; i < 1024; i++ { go func() { - if _, err := s.Add(ctx, tessera.NewEntry(test.entry)); err != nil { + if _, err := s.Add(ctx, tessera.NewEntry(test.entry))(); err != nil { t.Errorf("got err: %v", err) } }() @@ -280,7 +280,7 @@ func TestTileRoundTrip(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry)) + entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))() if err != nil { t.Errorf("Add got err: %v", err) } @@ -331,7 +331,7 @@ func TestEntryBundleRoundTrip(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry)) + entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))() if err != nil { t.Errorf("Add got err: %v", err) } diff --git a/storage/posix/files.go b/storage/posix/files.go index 91a6a005..3a3fd6b9 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -121,8 +121,8 @@ func (s *Storage) unlockCP() error { // Add commits to sequence numbers for an entry // Returns the sequence number assigned to the first entry in the batch, or an error. -func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, e)() +func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, e) } // GetEntryBundle retrieves the Nth entries bundle for a log of the given size. diff --git a/storage/queue.go b/storage/queue.go index 4e1e0446..0fb1337b 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -44,9 +44,6 @@ type Queue struct { inFlight map[string]*queueItem } -// Future is a function which returns an assigned log index, or an error. -type Future func() (idx uint64, err error) - // FlushFunc is the signature of a function which will receive the slice of queued entries. // Normally, this function would be provided by storage implementations. It's important to note // that the implementation MUST call each entry's MarshalBundleData function before attempting @@ -116,7 +113,7 @@ func (q *Queue) squashDupes(e *tessera.Entry) (*queueItem, bool) { } // Add places e into the queue, and returns a func which may be called to retrieve the assigned index. -func (q *Queue) Add(ctx context.Context, e *tessera.Entry) Future { +func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { entry, isDupe := q.squashDupes(e) if isDupe { // This entry is already in the queue, so no need to add it again. @@ -154,15 +151,15 @@ func (q *Queue) doFlush(ctx context.Context, entries []*queueItem) { // hang until assign is called. type queueItem struct { entry *tessera.Entry - c chan Future - f Future + c chan tessera.IndexFuture + f tessera.IndexFuture } // newEntry creates a new entry for the provided data. func newEntry(data *tessera.Entry) *queueItem { e := &queueItem{ entry: data, - c: make(chan Future, 1), + c: make(chan tessera.IndexFuture, 1), } e.f = sync.OnceValues(func() (uint64, error) { return (<-e.c)() diff --git a/storage/queue_test.go b/storage/queue_test.go index b58ca598..30abadc5 100644 --- a/storage/queue_test.go +++ b/storage/queue_test.go @@ -74,7 +74,7 @@ func TestQueue(t *testing.T) { q := storage.NewQueue(ctx, test.maxWait, uint(test.maxEntries), flushFunc) // Now submit a bunch of entries - adds := make([]storage.Future, test.numItems) + adds := make([]tessera.IndexFuture, test.numItems) wantEntries := make([]*tessera.Entry, test.numItems) for i := uint64(0); i < test.numItems; i++ { d := []byte(fmt.Sprintf("item %d", i)) @@ -109,7 +109,7 @@ func TestDedup(t *testing.T) { }) numEntries := 10 - adds := []storage.Future{} + adds := []tessera.IndexFuture{} for i := 0; i < numEntries; i++ { adds = append(adds, q.Add(ctx, tessera.NewEntry([]byte("Have I seen this before?")))) }