From 22f1bcf26d11d538c7c4b47214cbc89a9da56cf5 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 15:57:48 +0000 Subject: [PATCH 01/10] Return futures --- log.go | 3 +++ storage/gcp/gcp.go | 4 ++-- storage/queue.go | 11 ++++------- storage/queue_test.go | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/log.go b/log.go index bb2c3088..26c37efa 100644 --- a/log.go +++ b/log.go @@ -45,6 +45,9 @@ type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error) // EntriesPathFunc is the signature of a function which knows how to format entry bundle paths. type EntriesPathFunc func(n, logSize uint64) string +// IndexFuture is the signature of a function which can return an assigned index or error. +type IndexFuture func() (uint64, error) + // StorageOptions holds optional settings for all storage implementations. type StorageOptions struct { NewCP NewCPFunc diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 635c6ae7..b12ff5f5 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -166,8 +166,8 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) } // Add is the entrypoint for adding entries to a sequencing log. -func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, e)() +func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, e) } // Get returns the requested object. diff --git a/storage/queue.go b/storage/queue.go index 4e1e0446..0fb1337b 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -44,9 +44,6 @@ type Queue struct { inFlight map[string]*queueItem } -// Future is a function which returns an assigned log index, or an error. -type Future func() (idx uint64, err error) - // FlushFunc is the signature of a function which will receive the slice of queued entries. // Normally, this function would be provided by storage implementations. It's important to note // that the implementation MUST call each entry's MarshalBundleData function before attempting @@ -116,7 +113,7 @@ func (q *Queue) squashDupes(e *tessera.Entry) (*queueItem, bool) { } // Add places e into the queue, and returns a func which may be called to retrieve the assigned index. -func (q *Queue) Add(ctx context.Context, e *tessera.Entry) Future { +func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { entry, isDupe := q.squashDupes(e) if isDupe { // This entry is already in the queue, so no need to add it again. @@ -154,15 +151,15 @@ func (q *Queue) doFlush(ctx context.Context, entries []*queueItem) { // hang until assign is called. type queueItem struct { entry *tessera.Entry - c chan Future - f Future + c chan tessera.IndexFuture + f tessera.IndexFuture } // newEntry creates a new entry for the provided data. func newEntry(data *tessera.Entry) *queueItem { e := &queueItem{ entry: data, - c: make(chan Future, 1), + c: make(chan tessera.IndexFuture, 1), } e.f = sync.OnceValues(func() (uint64, error) { return (<-e.c)() diff --git a/storage/queue_test.go b/storage/queue_test.go index b58ca598..30abadc5 100644 --- a/storage/queue_test.go +++ b/storage/queue_test.go @@ -74,7 +74,7 @@ func TestQueue(t *testing.T) { q := storage.NewQueue(ctx, test.maxWait, uint(test.maxEntries), flushFunc) // Now submit a bunch of entries - adds := make([]storage.Future, test.numItems) + adds := make([]tessera.IndexFuture, test.numItems) wantEntries := make([]*tessera.Entry, test.numItems) for i := uint64(0); i < test.numItems; i++ { d := []byte(fmt.Sprintf("item %d", i)) @@ -109,7 +109,7 @@ func TestDedup(t *testing.T) { }) numEntries := 10 - adds := []storage.Future{} + adds := []tessera.IndexFuture{} for i := 0; i < numEntries; i++ { adds = append(adds, q.Add(ctx, tessera.NewEntry([]byte("Have I seen this before?")))) } From 47fb51738c4fa469c00b01c4d48ae5c08db83407 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 15:58:26 +0000 Subject: [PATCH 02/10] POSIX --- storage/posix/files.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/posix/files.go b/storage/posix/files.go index 91a6a005..3a3fd6b9 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -121,8 +121,8 @@ func (s *Storage) unlockCP() error { // Add commits to sequence numbers for an entry // Returns the sequence number assigned to the first entry in the batch, or an error. -func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, e)() +func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, e) } // GetEntryBundle retrieves the Nth entries bundle for a log of the given size. From 6d3e77873e670e66c397727ca6343f519da04bd0 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 15:58:49 +0000 Subject: [PATCH 03/10] MySQL --- storage/mysql/mysql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index b8b1285f..5a31faee 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -216,8 +216,8 @@ func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64 } // Add is the entrypoint for adding entries to a sequencing log. -func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) { - return s.queue.Add(ctx, entry)() +func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture { + return s.queue.Add(ctx, entry) } // sequenceBatch writes the entries from the provided batch into the entry bundle files of the log. From bc9337317b0be27b6a57b1f237865597c9b7d792 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 15:59:30 +0000 Subject: [PATCH 04/10] GCP f --- cmd/example-gcp/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/example-gcp/main.go b/cmd/example-gcp/main.go index 5413130b..9c926222 100644 --- a/cmd/example-gcp/main.go +++ b/cmd/example-gcp/main.go @@ -83,7 +83,7 @@ func main() { defer r.Body.Close() id := sha256.Sum256(b) - idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:]))) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:])))() if err != nil { if errors.Is(err, tessera.ErrPushback) { w.Header().Add("Retry-After", "1") From 15cf0cedde8f086730d47eb35b260cd20bdabe87 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 16:06:53 +0000 Subject: [PATCH 05/10] POSIX-oneshot --- cmd/posix-oneshot/main.go | 46 ++++++++++++++------------------------- 1 file changed, 16 insertions(+), 30 deletions(-) diff --git a/cmd/posix-oneshot/main.go b/cmd/posix-oneshot/main.go index 287ad5b6..ee865e39 100644 --- a/cmd/posix-oneshot/main.go +++ b/cmd/posix-oneshot/main.go @@ -25,7 +25,6 @@ import ( "fmt" "os" "path/filepath" - "sync" "time" "golang.org/x/mod/sumdb/note" @@ -139,7 +138,7 @@ func main() { } return cp.Size, cp.Hash, nil } - st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(256, time.Second)) + st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(uint(len(toAdd)), time.Second)) // sequence entries @@ -149,41 +148,28 @@ func main() { // sequence numbers assigned to the data from the provided input files. type entryInfo struct { name string - e *tessera.Entry + f tessera.IndexFuture } entryChan := make(chan entryInfo, 100) - go func() { - for _, fp := range toAdd { - b, err := os.ReadFile(fp) - if err != nil { - klog.Exitf("Failed to read entry file %q: %q", fp, err) - } - entryChan <- entryInfo{name: fp, e: tessera.NewEntry(b)} + for _, fp := range toAdd { + b, err := os.ReadFile(fp) + if err != nil { + klog.Exitf("Failed to read entry file %q: %q", fp, err) } - close(entryChan) - }() - numWorkers := 256 - if l := len(toAdd); l < numWorkers { - numWorkers = l + // ask storage to sequence, we'll put the future we get back into the entryChan for later... + f := st.Add(ctx, tessera.NewEntry(b)) + entryChan <- entryInfo{name: fp, f: f} } + close(entryChan) - wg := sync.WaitGroup{} - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for entry := range entryChan { - // ask storage to sequence - seq, err := st.Add(context.Background(), entry.e) - if err != nil { - klog.Exitf("failed to sequence %q: %q", entry.name, err) - } - klog.Infof("%d: %v", seq, entry.name) - } - }() + for entry := range entryChan { + seq, err := entry.f() + if err != nil { + klog.Exitf("failed to sequence %q: %q", entry.name, err) + } + klog.Infof("%d: %v", seq, entry.name) } - wg.Wait() } func getKeyFile(path string) (string, error) { From b267942d716f9cad70bf66717630de3b0da6f21e Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 16:03:35 +0000 Subject: [PATCH 06/10] MySQL f --- cmd/example-mysql/main.go | 2 +- storage/mysql/mysql_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/example-mysql/main.go b/cmd/example-mysql/main.go index 95d38264..44f7e2d8 100644 --- a/cmd/example-mysql/main.go +++ b/cmd/example-mysql/main.go @@ -92,7 +92,7 @@ func main() { klog.Warningf("/add: %v", err) } }() - idx, err := storage.Add(r.Context(), tessera.NewEntry(b)) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b))() if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 2f07a204..9f3a0b1b 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -249,7 +249,7 @@ func TestParallelAdd(t *testing.T) { t.Run(test.name, func(t *testing.T) { for i := 0; i < 1024; i++ { go func() { - if _, err := s.Add(ctx, tessera.NewEntry(test.entry)); err != nil { + if _, err := s.Add(ctx, tessera.NewEntry(test.entry))(); err != nil { t.Errorf("got err: %v", err) } }() @@ -280,7 +280,7 @@ func TestTileRoundTrip(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry)) + entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))() if err != nil { t.Errorf("Add got err: %v", err) } @@ -331,7 +331,7 @@ func TestEntryBundleRoundTrip(t *testing.T) { }, } { t.Run(test.name, func(t *testing.T) { - entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry)) + entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))() if err != nil { t.Errorf("Add got err: %v", err) } From 18e3f3dbb00582a23c0e8d29f0aeb9893b269a40 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 16:09:07 +0000 Subject: [PATCH 07/10] Integration test --- integration/storage_uniformity_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/storage_uniformity_test.go b/integration/storage_uniformity_test.go index 1f776ae3..fa59d50a 100644 --- a/integration/storage_uniformity_test.go +++ b/integration/storage_uniformity_test.go @@ -24,7 +24,7 @@ import ( ) type StorageContract interface { - Add(ctx context.Context, entry *tessera.Entry) (uint64, error) + Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture } var ( From bd0e1c394cf7f1805ad4392641c391de1eeb62f0 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 28 Aug 2024 16:23:02 +0000 Subject: [PATCH 08/10] SCTFE --- ct_only.go | 8 ++++---- personalities/sctfe/handlers.go | 2 +- personalities/sctfe/handlers_test.go | 6 +++--- personalities/sctfe/mockstorage/mock_ct_storage.go | 8 ++++---- personalities/sctfe/storage.go | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/ct_only.go b/ct_only.go index 9fa8eb53..43a201b8 100644 --- a/ct_only.go +++ b/ct_only.go @@ -24,10 +24,10 @@ import ( // Storage described the expected functions from Tessera storage implementations. type Storage interface { - // Add should duably assign an index to the provided Entry, and return it. + // Add should duably assign an index to the provided Entry, returning a future to access that value. // // Implementations MUST call MarshalBundleData method on the entry before persisting/integrating it. - Add(context.Context, *Entry) (uint64, error) + Add(context.Context, *Entry) IndexFuture } // NewCertificateTransparencySequencedWriter returns a function which knows how to add a CT-specific entry type to the log. @@ -38,8 +38,8 @@ type Storage interface { // b) is not compatible with the https://c2sp.org/tlog-tiles API which we _very strongly_ encourage you to use instead. // // Returns the assigned index in the log, or an error. -func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) (uint64, error) { - return func(ctx context.Context, e *ctonly.Entry) (uint64, error) { +func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) IndexFuture { + return func(ctx context.Context, e *ctonly.Entry) IndexFuture { return s.Add(ctx, convertCTEntry(e)) } } diff --git a/personalities/sctfe/handlers.go b/personalities/sctfe/handlers.go index 6d1158c5..0fcd9ae3 100644 --- a/personalities/sctfe/handlers.go +++ b/personalities/sctfe/handlers.go @@ -334,7 +334,7 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r } klog.V(2).Infof("%s: %s => storage.Add", li.LogOrigin, method) - idx, err := li.storage.Add(ctx, entry) + idx, err := li.storage.Add(ctx, entry)() if err != nil { if errors.Is(err, tessera.ErrPushback) { w.Header().Add("Retry-After", "1") diff --git a/personalities/sctfe/handlers_test.go b/personalities/sctfe/handlers_test.go index cca6f1ff..fddb747d 100644 --- a/personalities/sctfe/handlers_test.go +++ b/personalities/sctfe/handlers_test.go @@ -294,7 +294,7 @@ func TestAddChainWhitespace(t *testing.T) { t.Run(test.descr, func(t *testing.T) { if test.want == http.StatusOK { info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, nil) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, nil }) } recorder := httptest.NewRecorder() @@ -368,7 +368,7 @@ func TestAddChain(t *testing.T) { req, leafChain := parseChain(t, false, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) } recorder := makeAddChainRequest(t, info.li, chain) @@ -457,7 +457,7 @@ func TestAddPrechain(t *testing.T) { req, leafChain := parseChain(t, true, test.chain, info.roots.RawCertificates()[0]) rsp := uint64(0) info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil) - info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err) + info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err }) } recorder := makeAddPrechainRequest(t, info.li, chain) diff --git a/personalities/sctfe/mockstorage/mock_ct_storage.go b/personalities/sctfe/mockstorage/mock_ct_storage.go index 373556f0..75509fd8 100644 --- a/personalities/sctfe/mockstorage/mock_ct_storage.go +++ b/personalities/sctfe/mockstorage/mock_ct_storage.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" x509 "github.com/google/certificate-transparency-go/x509" + tessera "github.com/transparency-dev/trillian-tessera" ctonly "github.com/transparency-dev/trillian-tessera/ctonly" ) @@ -37,12 +38,11 @@ func (m *MockStorage) EXPECT() *MockStorageMockRecorder { } // Add mocks base method. -func (m *MockStorage) Add(arg0 context.Context, arg1 *ctonly.Entry) (uint64, error) { +func (m *MockStorage) Add(arg0 context.Context, arg1 *ctonly.Entry) tessera.IndexFuture { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Add", arg0, arg1) - ret0, _ := ret[0].(uint64) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(tessera.IndexFuture) + return ret0 } // Add indicates an expected call of Add. diff --git a/personalities/sctfe/storage.go b/personalities/sctfe/storage.go index b204d68b..9bd3d75c 100644 --- a/personalities/sctfe/storage.go +++ b/personalities/sctfe/storage.go @@ -36,8 +36,8 @@ const ( // Storage provides all the storage primitives necessary to write to a ct-static-api log. type Storage interface { - // Add assigns an index to the provided Entry, stages the entry for integration, and return it the assigned index. - Add(context.Context, *ctonly.Entry) (uint64, error) + // Add assigns an index to the provided Entry, stages the entry for integration, and returns a future for the assigned index. + Add(context.Context, *ctonly.Entry) tessera.IndexFuture // AddIssuerChain stores every the chain certificate in a content-addressable store under their sha256 hash. AddIssuerChain(context.Context, []*x509.Certificate) error } @@ -54,7 +54,7 @@ type IssuerStorage interface { // CTStorage implements Storage. type CTStorage struct { - storeData func(context.Context, *ctonly.Entry) (uint64, error) + storeData func(context.Context, *ctonly.Entry) tessera.IndexFuture storeIssuers func(context.Context, []KV) error } @@ -68,7 +68,7 @@ func NewCTSTorage(logStorage tessera.Storage, issuerStorage IssuerStorage) (*CTS } // Add stores CT entries. -func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) (uint64, error) { +func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) tessera.IndexFuture { // TODO(phboneff): add deduplication and chain storage return cts.storeData(ctx, entry) } From 30897f05b6e540c6bfa11aec321ae772b2812306 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 29 Aug 2024 10:26:27 +0000 Subject: [PATCH 09/10] Add comment about futures --- log.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/log.go b/log.go index 26c37efa..37a1d35f 100644 --- a/log.go +++ b/log.go @@ -46,6 +46,9 @@ type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error) type EntriesPathFunc func(n, logSize uint64) string // IndexFuture is the signature of a function which can return an assigned index or error. +// +// Implementations of this func are likely to be "futures", or a promise to return this data at +// some point in the future, and as such will block when called if the data isn't yet available. type IndexFuture func() (uint64, error) // StorageOptions holds optional settings for all storage implementations. From 83418b324bfe84647f16d3aed78edd6eb5302ce3 Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Thu, 29 Aug 2024 17:00:54 +0000 Subject: [PATCH 10/10] POSIX f --- cmd/example-posix/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/example-posix/main.go b/cmd/example-posix/main.go index c6b20077..fa9d9b79 100644 --- a/cmd/example-posix/main.go +++ b/cmd/example-posix/main.go @@ -142,7 +142,7 @@ func main() { klog.Warningf("/add: %v", err) } }() - idx, err := storage.Add(r.Context(), tessera.NewEntry(b)) + idx, err := storage.Add(r.Context(), tessera.NewEntry(b))() if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error()))