Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surface futures from Add() #182

Merged
merged 10 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/example-gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
defer r.Body.Close()

id := sha256.Sum256(b)
idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:])))
idx, err := storage.Add(r.Context(), tessera.NewEntry(b, tessera.WithIdentity(id[:])))()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
w.Header().Add("Retry-After", "1")
Expand Down
2 changes: 1 addition & 1 deletion cmd/example-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func main() {
klog.Warningf("/add: %v", err)
}
}()
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion cmd/example-posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func main() {
klog.Warningf("/add: %v", err)
}
}()
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))
idx, err := storage.Add(r.Context(), tessera.NewEntry(b))()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
Expand Down
46 changes: 16 additions & 30 deletions cmd/posix-oneshot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"time"

"golang.org/x/mod/sumdb/note"
Expand Down Expand Up @@ -139,7 +138,7 @@ func main() {
}
return cp.Size, cp.Hash, nil
}
st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(256, time.Second))
st := posix.New(ctx, *storageDir, readCP, tessera.WithCheckpointSignerVerifier(s, v), tessera.WithBatching(uint(len(toAdd)), time.Second))

// sequence entries

Expand All @@ -149,41 +148,28 @@ func main() {
// sequence numbers assigned to the data from the provided input files.
type entryInfo struct {
name string
e *tessera.Entry
f tessera.IndexFuture
}
entryChan := make(chan entryInfo, 100)
go func() {
for _, fp := range toAdd {
b, err := os.ReadFile(fp)
if err != nil {
klog.Exitf("Failed to read entry file %q: %q", fp, err)
}
entryChan <- entryInfo{name: fp, e: tessera.NewEntry(b)}
for _, fp := range toAdd {
b, err := os.ReadFile(fp)
if err != nil {
klog.Exitf("Failed to read entry file %q: %q", fp, err)
}
close(entryChan)
}()

numWorkers := 256
if l := len(toAdd); l < numWorkers {
numWorkers = l
// ask storage to sequence, we'll put the future we get back into the entryChan for later...
f := st.Add(ctx, tessera.NewEntry(b))
entryChan <- entryInfo{name: fp, f: f}
}
close(entryChan)

wg := sync.WaitGroup{}
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for entry := range entryChan {
// ask storage to sequence
seq, err := st.Add(context.Background(), entry.e)
if err != nil {
klog.Exitf("failed to sequence %q: %q", entry.name, err)
}
klog.Infof("%d: %v", seq, entry.name)
}
}()
for entry := range entryChan {
seq, err := entry.f()
if err != nil {
klog.Exitf("failed to sequence %q: %q", entry.name, err)
}
klog.Infof("%d: %v", seq, entry.name)
}
wg.Wait()
}

func getKeyFile(path string) (string, error) {
Expand Down
8 changes: 4 additions & 4 deletions ct_only.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (

// Storage described the expected functions from Tessera storage implementations.
type Storage interface {
// Add should duably assign an index to the provided Entry, and return it.
// Add should duably assign an index to the provided Entry, returning a future to access that value.
//
// Implementations MUST call MarshalBundleData method on the entry before persisting/integrating it.
Add(context.Context, *Entry) (uint64, error)
Add(context.Context, *Entry) IndexFuture
}

// NewCertificateTransparencySequencedWriter returns a function which knows how to add a CT-specific entry type to the log.
Expand All @@ -38,8 +38,8 @@ type Storage interface {
// b) is not compatible with the https://c2sp.org/tlog-tiles API which we _very strongly_ encourage you to use instead.
//
// Returns the assigned index in the log, or an error.
func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) (uint64, error) {
return func(ctx context.Context, e *ctonly.Entry) (uint64, error) {
func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) IndexFuture {
return func(ctx context.Context, e *ctonly.Entry) IndexFuture {
return s.Add(ctx, convertCTEntry(e))
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration/storage_uniformity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

type StorageContract interface {
Add(ctx context.Context, entry *tessera.Entry) (uint64, error)
Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture
}

var (
Expand Down
6 changes: 6 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type ParseCPFunc func(raw []byte) (*f_log.Checkpoint, error)
// EntriesPathFunc is the signature of a function which knows how to format entry bundle paths.
type EntriesPathFunc func(n, logSize uint64) string

// IndexFuture is the signature of a function which can return an assigned index or error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obvious to people that know about futures, but worth adding a comment to mention that this will block?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good call - done.

//
// Implementations of this func are likely to be "futures", or a promise to return this data at
// some point in the future, and as such will block when called if the data isn't yet available.
type IndexFuture func() (uint64, error)

// StorageOptions holds optional settings for all storage implementations.
type StorageOptions struct {
NewCP NewCPFunc
Expand Down
2 changes: 1 addition & 1 deletion personalities/sctfe/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func addChainInternal(ctx context.Context, li *logInfo, w http.ResponseWriter, r
}

klog.V(2).Infof("%s: %s => storage.Add", li.LogOrigin, method)
idx, err := li.storage.Add(ctx, entry)
idx, err := li.storage.Add(ctx, entry)()
if err != nil {
if errors.Is(err, tessera.ErrPushback) {
w.Header().Add("Retry-After", "1")
Expand Down
6 changes: 3 additions & 3 deletions personalities/sctfe/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestAddChainWhitespace(t *testing.T) {
t.Run(test.descr, func(t *testing.T) {
if test.want == http.StatusOK {
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, nil })
}

recorder := httptest.NewRecorder()
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestAddChain(t *testing.T) {
req, leafChain := parseChain(t, false, test.chain, info.roots.RawCertificates()[0])
rsp := uint64(0)
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err })
}

recorder := makeAddChainRequest(t, info.li, chain)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestAddPrechain(t *testing.T) {
req, leafChain := parseChain(t, true, test.chain, info.roots.RawCertificates()[0])
rsp := uint64(0)
info.storage.EXPECT().AddIssuerChain(deadlineMatcher(), cmpMatcher{leafChain[1:]}).Return(nil)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(rsp, test.err)
info.storage.EXPECT().Add(deadlineMatcher(), cmpMatcher{req}).Return(func() (uint64, error) { return rsp, test.err })
}

recorder := makeAddPrechainRequest(t, info.li, chain)
Expand Down
8 changes: 4 additions & 4 deletions personalities/sctfe/mockstorage/mock_ct_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions personalities/sctfe/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (

// Storage provides all the storage primitives necessary to write to a ct-static-api log.
type Storage interface {
// Add assigns an index to the provided Entry, stages the entry for integration, and return it the assigned index.
Add(context.Context, *ctonly.Entry) (uint64, error)
// Add assigns an index to the provided Entry, stages the entry for integration, and returns a future for the assigned index.
Add(context.Context, *ctonly.Entry) tessera.IndexFuture
// AddIssuerChain stores every the chain certificate in a content-addressable store under their sha256 hash.
AddIssuerChain(context.Context, []*x509.Certificate) error
}
Expand All @@ -54,7 +54,7 @@ type IssuerStorage interface {

// CTStorage implements Storage.
type CTStorage struct {
storeData func(context.Context, *ctonly.Entry) (uint64, error)
storeData func(context.Context, *ctonly.Entry) tessera.IndexFuture
storeIssuers func(context.Context, []KV) error
}

Expand All @@ -68,7 +68,7 @@ func NewCTSTorage(logStorage tessera.Storage, issuerStorage IssuerStorage) (*CTS
}

// Add stores CT entries.
func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) (uint64, error) {
func (cts *CTStorage) Add(ctx context.Context, entry *ctonly.Entry) tessera.IndexFuture {
// TODO(phboneff): add deduplication and chain storage
return cts.storeData(ctx, entry)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions))
}

// Add is the entrypoint for adding entries to a sequencing log.
func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) {
return s.queue.Add(ctx, e)()
func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
return s.queue.Add(ctx, e)
}

// Get returns the requested object.
Expand Down
4 changes: 2 additions & 2 deletions storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func (s *Storage) writeEntryBundle(ctx context.Context, tx *sql.Tx, index uint64
}

// Add is the entrypoint for adding entries to a sequencing log.
func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) (uint64, error) {
return s.queue.Add(ctx, entry)()
func (s *Storage) Add(ctx context.Context, entry *tessera.Entry) tessera.IndexFuture {
return s.queue.Add(ctx, entry)
}

// sequenceBatch writes the entries from the provided batch into the entry bundle files of the log.
Expand Down
6 changes: 3 additions & 3 deletions storage/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestParallelAdd(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
for i := 0; i < 1024; i++ {
go func() {
if _, err := s.Add(ctx, tessera.NewEntry(test.entry)); err != nil {
if _, err := s.Add(ctx, tessera.NewEntry(test.entry))(); err != nil {
t.Errorf("got err: %v", err)
}
}()
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestTileRoundTrip(t *testing.T) {
},
} {
t.Run(test.name, func(t *testing.T) {
entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))
entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))()
if err != nil {
t.Errorf("Add got err: %v", err)
}
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestEntryBundleRoundTrip(t *testing.T) {
},
} {
t.Run(test.name, func(t *testing.T) {
entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))
entryIndex, err := s.Add(ctx, tessera.NewEntry(test.entry))()
if err != nil {
t.Errorf("Add got err: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/posix/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (s *Storage) unlockCP() error {

// Add commits to sequence numbers for an entry
// Returns the sequence number assigned to the first entry in the batch, or an error.
func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) {
return s.queue.Add(ctx, e)()
func (s *Storage) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
return s.queue.Add(ctx, e)
}

// GetEntryBundle retrieves the Nth entries bundle for a log of the given size.
Expand Down
11 changes: 4 additions & 7 deletions storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type Queue struct {
inFlight map[string]*queueItem
}

// Future is a function which returns an assigned log index, or an error.
type Future func() (idx uint64, err error)

// FlushFunc is the signature of a function which will receive the slice of queued entries.
// Normally, this function would be provided by storage implementations. It's important to note
// that the implementation MUST call each entry's MarshalBundleData function before attempting
Expand Down Expand Up @@ -116,7 +113,7 @@ func (q *Queue) squashDupes(e *tessera.Entry) (*queueItem, bool) {
}

// Add places e into the queue, and returns a func which may be called to retrieve the assigned index.
func (q *Queue) Add(ctx context.Context, e *tessera.Entry) Future {
func (q *Queue) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
entry, isDupe := q.squashDupes(e)
if isDupe {
// This entry is already in the queue, so no need to add it again.
Expand Down Expand Up @@ -154,15 +151,15 @@ func (q *Queue) doFlush(ctx context.Context, entries []*queueItem) {
// hang until assign is called.
type queueItem struct {
entry *tessera.Entry
c chan Future
f Future
c chan tessera.IndexFuture
f tessera.IndexFuture
}

// newEntry creates a new entry for the provided data.
func newEntry(data *tessera.Entry) *queueItem {
e := &queueItem{
entry: data,
c: make(chan Future, 1),
c: make(chan tessera.IndexFuture, 1),
}
e.f = sync.OnceValues(func() (uint64, error) {
return (<-e.c)()
Expand Down
4 changes: 2 additions & 2 deletions storage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestQueue(t *testing.T) {
q := storage.NewQueue(ctx, test.maxWait, uint(test.maxEntries), flushFunc)

// Now submit a bunch of entries
adds := make([]storage.Future, test.numItems)
adds := make([]tessera.IndexFuture, test.numItems)
wantEntries := make([]*tessera.Entry, test.numItems)
for i := uint64(0); i < test.numItems; i++ {
d := []byte(fmt.Sprintf("item %d", i))
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestDedup(t *testing.T) {
})

numEntries := 10
adds := []storage.Future{}
adds := []tessera.IndexFuture{}
for i := 0; i < numEntries; i++ {
adds = append(adds, q.Add(ctx, tessera.NewEntry([]byte("Have I seen this before?"))))
}
Expand Down
Loading