From a79c2576c6ca29e7afe6abffbdfeb5eb88355c76 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 21 Oct 2024 12:25:30 +0200 Subject: [PATCH] worker: goto loop if there are new slabs to upload --- worker/upload.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/worker/upload.go b/worker/upload.go index 547d872e8..2a0407fb1 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -238,6 +238,7 @@ func (w *Worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe interruptCtx, interruptCancel := context.WithCancel(w.shutdownCtx) defer interruptCancel() +loop: var wg sync.WaitGroup for { // block until we have memory @@ -287,6 +288,14 @@ func (w *Worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe // wait for all threads to finish wg.Wait() + // no need to close out the thread if there's new packed slabs for upload + packedSlabs, err := w.bus.PackedSlabsForUpload(interruptCtx, defaultPackedSlabsLockDuration, uint8(rs.MinShards), uint8(rs.TotalShards), contractSet, 1) + if err != nil { + w.logger.Errorf("couldn't fetch packed slabs from bus: %v", err) + } else if len(packedSlabs) > 0 { + goto loop + } + fmt.Printf("DEBUG PJ: THREAD %v DONE\n", key) }