Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code-standards(submit loop): small refactor to submit loop to move timer to submitter thread #1014

Merged
merged 2 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ func SubmitLoopInner(
submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread

eg.Go(func() error {
// 'trigger': we need one thread to continuously consume the bytes produced channel, and to monitor timer
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since the other thread keeps track of the actual time
defer ticker.Stop()
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
// too much stuff is pending submission
Expand All @@ -63,13 +62,6 @@ func SubmitLoopInner(
case <-ctx.Done():
return ctx.Err()
case <-trigger.C:
case <-ticker.C:
// It's theoretically possible for the thread scheduler to pause this thread after entering this if statement
// for enough time for the submitter thread to submit all the pending bytes and do the nudge, and then for the
// thread scheduler to wake up this thread after the nudge has been missed, which would be a deadlock.
// Although this is only a theoretical possibility which should never happen in practice, it may be possible, e.g.
// in adverse CPU conditions or tests using compressed timeframes. To be sound, we also nudge with the ticker, which
// has no downside.
}
} else {
select {
Expand All @@ -78,26 +70,27 @@ func SubmitLoopInner(
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Info("Added bytes produced to bytes pending submission counter.", "n", n)
case <-ticker.C:
}
}

types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))
submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why submitter needs to just to wakeup?
u should move the maxDataNotExceeded and lastSubmissionIsRecent checks to trigger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 'should' I?
You need to actually argue and prove it works

Copy link
Contributor Author

@danwt danwt Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyways happy to merge this? because it's an improvement regardless

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-submitter.C:
}
pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
Expand All @@ -117,6 +110,7 @@ func SubmitLoopInner(
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
Expand Down
6 changes: 3 additions & 3 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func testSubmitLoopInner(
nProducedBytes.Add(^uint64(consumed - 1)) // subtract

timeLastProgressT := time.Unix(timeLastProgress.Load(), 0)
absoluteMax := int64(1.5 * float64(args.maxTime)) // allow some leeway for code execution
absoluteMax := int64(2 * float64(args.maxTime)) // allow some leeway for code execution. Tests may run on small boxes (GH actions)
timeSinceLast := time.Since(timeLastProgressT).Milliseconds()
require.True(t, timeSinceLast < absoluteMax, "too long since last update", "timeSinceLast", timeSinceLast, "max", absoluteMax)

Expand All @@ -115,7 +115,7 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
testSubmitLoop(
t,
testArgs{
nParallel: 100,
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchBytes: 100,
Expand All @@ -136,7 +136,7 @@ func TestSubmitLoopTimer(t *testing.T) {
testSubmitLoop(
t,
testArgs{
nParallel: 100,
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchBytes: 100,
Expand Down
Loading