From cf5b45e1794a676124c74f29f289a62edf613e11 Mon Sep 17 00:00:00 2001 From: Matthias Glastra Date: Tue, 30 Apr 2024 18:10:08 +0200 Subject: [PATCH] feat: Parallel attestors per type. Signed-off-by: Matthias Glastra --- attestation/context.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/attestation/context.go b/attestation/context.go index 0e916731..969805a3 100644 --- a/attestation/context.go +++ b/attestation/context.go @@ -19,6 +19,7 @@ import ( "crypto" "fmt" "os" + "sync" "time" "github.com/in-toto/go-witness/cryptoutil" @@ -98,6 +99,7 @@ type AttestationContext struct { products map[string]Product materials map[string]cryptoutil.DigestSet stepName string + mutex sync.RWMutex } type Product struct { @@ -149,45 +151,68 @@ func (ctx *AttestationContext) RunAttestors() error { } for _, k := range order { - log.Debugf("Starting %s attestors...", k.String()) + log.Infof("Starting %s attestors stage...", k.String()) + + var wg sync.WaitGroup + ch := make(chan int, len(attestors)) + for _, att := range attestors[k] { - log.Infof("Starting %v attestor...", att.Name()) - ctx.runAttestor(att) + wg.Add(1) + go func(att Attestor) { + defer func() { wg.Done(); <-ch }() + ctx.runAttestor(att) + }(att) } + wg.Wait() + log.Infof("Completed %s attestors stage...", k.String()) } return nil } func (ctx *AttestationContext) runAttestor(attestor Attestor) { + log.Infof("Starting %v attestor...", attestor.Name()) + startTime := time.Now() if err := attestor.Attest(ctx); err != nil { + ctx.mutex.Lock() ctx.completedAttestors = append(ctx.completedAttestors, CompletedAttestor{ Attestor: attestor, StartTime: startTime, EndTime: time.Now(), Error: err, }) + ctx.mutex.Unlock() } + ctx.mutex.Lock() ctx.completedAttestors = append(ctx.completedAttestors, CompletedAttestor{ Attestor: attestor, StartTime: startTime, EndTime: time.Now(), }) + ctx.mutex.Unlock() if materialer, ok := attestor.(Materialer); ok { + ctx.mutex.Lock() ctx.addMaterials(materialer) + ctx.mutex.Unlock() } if producer, ok := attestor.(Producer); ok { + ctx.mutex.Lock() ctx.addProducts(producer) + ctx.mutex.Unlock() } + + log.Infof("Finished %v attestor... (%vs)", attestor.Name(), time.Since(startTime).Seconds()) } func (ctx *AttestationContext) CompletedAttestors() []CompletedAttestor { + ctx.mutex.RLock() out := make([]CompletedAttestor, len(ctx.completedAttestors)) copy(out, ctx.completedAttestors) + ctx.mutex.RUnlock() return out } @@ -196,8 +221,10 @@ func (ctx *AttestationContext) WorkingDir() string { } func (ctx *AttestationContext) Hashes() []cryptoutil.DigestValue { + ctx.mutex.RLock() hashes := make([]cryptoutil.DigestValue, len(ctx.hashes)) copy(hashes, ctx.hashes) + ctx.mutex.RUnlock() return hashes } @@ -206,18 +233,22 @@ func (ctx *AttestationContext) Context() context.Context { } func (ctx *AttestationContext) Materials() map[string]cryptoutil.DigestSet { + ctx.mutex.RLock() out := make(map[string]cryptoutil.DigestSet) for k, v := range ctx.materials { out[k] = v } + ctx.mutex.RUnlock() return out } func (ctx *AttestationContext) Products() map[string]Product { + ctx.mutex.RLock() out := make(map[string]Product) for k, v := range ctx.products { out[k] = v } + ctx.mutex.RUnlock() return out }