Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block builder scheduler startup: learn state from worker updates #9897

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4783226
Take in changes from davidgrant/block-builder-scheduler-backlog
seizethedave Nov 4, 2024
ff2ee11
Tighten up updateInterval.
seizethedave Nov 5, 2024
f734096
Add startup phase. Add epoch to job key to detect startup conflicts.
seizethedave Nov 5, 2024
9ff6ac2
Startup/recovery mode. Listen to updates and convert to jobQueue at t…
seizethedave Nov 5, 2024
c9ebaf5
better JobNotAssigned message.
seizethedave Nov 5, 2024
d16ecd2
better name
seizethedave Nov 7, 2024
e16bdee
Tidy up observation-mode completion. Tidy up tests.
seizethedave Nov 7, 2024
ad6881d
Merge.
seizethedave Nov 8, 2024
1911495
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Nov 8, 2024
44db959
Initial testing of observation code.
seizethedave Nov 9, 2024
356194b
Restore 517.
seizethedave Nov 9, 2024
4fa848c
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Nov 12, 2024
d259e22
Epoch->int64; leaseTime->leaseExpiry.
seizethedave Nov 12, 2024
95006f7
job queue nil until observation mode over.
seizethedave Nov 12, 2024
8b352a6
Fetch Kafka offsets at startup.
seizethedave Nov 13, 2024
4eca9cb
Improve observation mode finalization.
seizethedave Nov 13, 2024
3c63143
Merge remote-tracking branch 'origin/main' into davidgrant/block-buil…
seizethedave Nov 13, 2024
f2bf7aa
Better tests for observation mode.
seizethedave Nov 13, 2024
5f40759
Remove redundant test.
seizethedave Nov 13, 2024
d59d02e
Improve the observation test.
seizethedave Nov 13, 2024
9393468
lint & tweaks.
seizethedave Nov 13, 2024
98cb58c
Remove offset-tracking code that doesn't work correctly.
seizethedave Nov 13, 2024
9c3ad6c
ded code
seizethedave Nov 13, 2024
826d738
Remove ref to old job updates.
seizethedave Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 87 additions & 23 deletions pkg/blockbuilder/scheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,92 @@ import (
var (
errNoJobAvailable = errors.New("no job available")
errJobNotFound = errors.New("job not found")
errJobNotAssigned = errors.New("job not assigned to worker")
errJobNotAssigned = errors.New("job not assigned to given worker")
errBadEpoch = errors.New("bad epoch")
)

type jobQueue struct {
leaseTime time.Duration
logger log.Logger
leaseExpiry time.Duration
logger log.Logger

mu sync.Mutex
epoch int64
jobs map[string]*job
unassigned jobHeap
}

func newJobQueue(leaseTime time.Duration, logger log.Logger) *jobQueue {
func newJobQueue(leaseExpiry time.Duration, logger log.Logger) *jobQueue {
return &jobQueue{
leaseTime: leaseTime,
logger: logger,
leaseExpiry: leaseExpiry,
logger: logger,

jobs: make(map[string]*job),
}
}

// assign assigns the highest-priority unassigned job to the given worker.
func (s *jobQueue) assign(workerID string) (string, jobSpec, error) {
func (s *jobQueue) assign(workerID string) (jobKey, jobSpec, error) {
if workerID == "" {
return "", jobSpec{}, errors.New("workerID cannot not be empty")
return jobKey{}, jobSpec{}, errors.New("workerID cannot be empty")
}

s.mu.Lock()
defer s.mu.Unlock()

if s.unassigned.Len() == 0 {
return "", jobSpec{}, errNoJobAvailable
return jobKey{}, jobSpec{}, errNoJobAvailable
}

j := heap.Pop(&s.unassigned).(*job)
j.key.epoch = s.epoch
s.epoch++
j.assignee = workerID
j.leaseExpiry = time.Now().Add(s.leaseTime)
return j.id, j.spec, nil
j.leaseExpiry = time.Now().Add(s.leaseExpiry)
return j.key, j.spec, nil
}

// importJob imports a job with the given ID and spec into the jobQueue. This is
// meant to be used during recovery, when we're reconstructing the jobQueue from
// worker updates.
func (s *jobQueue) importJob(key jobKey, workerID string, spec jobSpec) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
return errors.New("workerID cannot be empty")
}

s.mu.Lock()
defer s.mu.Unlock()

// When we start assigning new jobs, the epochs need to be compatible with
// these "imported" jobs.
s.epoch = max(s.epoch, key.epoch+1)

j, ok := s.jobs[key.id]
if ok {
if key.epoch < j.key.epoch {
return errBadEpoch
} else if key.epoch == j.key.epoch {
if j.assignee != workerID {
return errJobNotAssigned
}
} else {
// Otherwise, this caller is the new authority, so we accept the update.
j.assignee = workerID
j.key = key
j.spec = spec
}
} else {
s.jobs[key.id] = &job{
key: key,
assignee: workerID,
leaseExpiry: time.Now().Add(s.leaseExpiry),
failCount: 0,
spec: spec,
}
}
return nil
}

// addOrUpdate adds a new job or updates an existing job with the given spec.
Expand All @@ -60,18 +108,21 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {
defer s.mu.Unlock()

if j, ok := s.jobs[id]; ok {
// We can only update an unassigned job.
if j.assignee == "" {
// We can only update an unassigned job.
j.spec = spec
}
return
}

// Otherwise, add a new job.
j := &job{
id: id,
key: jobKey{
id: id,
epoch: 0,
},
assignee: "",
leaseExpiry: time.Now().Add(s.leaseTime),
leaseExpiry: time.Now().Add(s.leaseExpiry),
failCount: 0,
spec: spec,
}
Expand All @@ -81,8 +132,8 @@ func (s *jobQueue) addOrUpdate(id string, spec jobSpec) {

// renewLease renews the lease of the job with the given ID for the given
// worker.
func (s *jobQueue) renewLease(jobID, workerID string) error {
if jobID == "" {
func (s *jobQueue) renewLease(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
Expand All @@ -92,22 +143,25 @@ func (s *jobQueue) renewLease(jobID, workerID string) error {
s.mu.Lock()
defer s.mu.Unlock()

j, ok := s.jobs[jobID]
j, ok := s.jobs[key.id]
if !ok {
return errJobNotFound
}
if j.assignee != workerID {
return errJobNotAssigned
}
if j.key.epoch != key.epoch {
return errBadEpoch
}

j.leaseExpiry = time.Now().Add(s.leaseTime)
j.leaseExpiry = time.Now().Add(s.leaseExpiry)
return nil
}

// completeJob completes the job with the given ID for the given worker,
// removing it from the jobQueue.
func (s *jobQueue) completeJob(jobID, workerID string) error {
if jobID == "" {
func (s *jobQueue) completeJob(key jobKey, workerID string) error {
if key.id == "" {
return errors.New("jobID cannot be empty")
}
if workerID == "" {
Expand All @@ -117,15 +171,18 @@ func (s *jobQueue) completeJob(jobID, workerID string) error {
s.mu.Lock()
defer s.mu.Unlock()

j, ok := s.jobs[jobID]
j, ok := s.jobs[key.id]
if !ok {
return errJobNotFound
}
if j.assignee != workerID {
return errJobNotAssigned
}
if j.key.epoch != key.epoch {
return errBadEpoch
}

delete(s.jobs, jobID)
delete(s.jobs, key.id)
return nil
}

Expand All @@ -147,7 +204,7 @@ func (s *jobQueue) clearExpiredLeases() {
}

type job struct {
id string
key jobKey

assignee string
leaseExpiry time.Time
Expand All @@ -157,6 +214,13 @@ type job struct {
spec jobSpec
}

type jobKey struct {
id string
// The assignment epoch. This is used to break ties when multiple workers
// have knowledge of the same job.
epoch int64
}

type jobSpec struct {
topic string
partition int32
Expand Down
86 changes: 56 additions & 30 deletions pkg/blockbuilder/scheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,117 +17,143 @@ import (
func TestAssign(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))

j0id, j0spec, err := s.assign("w0")
require.Empty(t, j0id)
j0, j0spec, err := s.assign("w0")
require.Empty(t, j0.id)
require.Zero(t, j0spec)
require.ErrorIs(t, err, errNoJobAvailable)

s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
j1id, j1spec, err := s.assign("w0")
require.NotEmpty(t, j1id)
j1, j1spec, err := s.assign("w0")
require.NotEmpty(t, j1.id)
require.NotZero(t, j1spec)
require.NoError(t, err)
require.Equal(t, "w0", s.jobs[j1id].assignee)
require.Equal(t, "w0", s.jobs[j1.id].assignee)

j2id, j2spec, err := s.assign("w0")
require.Zero(t, j2id)
j2, j2spec, err := s.assign("w0")
require.Zero(t, j2.id)
require.Zero(t, j2spec)
require.ErrorIs(t, err, errNoJobAvailable)

s.addOrUpdate("job2", jobSpec{topic: "hello2", commitRecTs: time.Now()})
j3id, j3spec, err := s.assign("w0")
require.NotZero(t, j3id)
j3, j3spec, err := s.assign("w0")
require.NotZero(t, j3.id)
require.NotZero(t, j3spec)
require.NoError(t, err)
require.Equal(t, "w0", s.jobs[j3id].assignee)
require.Equal(t, "w0", s.jobs[j3.id].assignee)
}

func TestAssignComplete(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))

{
err := s.completeJob("rando job", "w0")
err := s.completeJob(jobKey{"rando job", 965}, "w0")
require.ErrorIs(t, err, errJobNotFound)
}

s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
jid, jspec, err := s.assign("w0")
require.NotZero(t, jid)
jk, jspec, err := s.assign("w0")
require.NotZero(t, jk)
require.NotZero(t, jspec)
require.NoError(t, err)
j, ok := s.jobs[jid]
j, ok := s.jobs[jk.id]
require.True(t, ok)
require.Equal(t, "w0", j.assignee)

{
err := s.completeJob("rando job", "w0")
err := s.completeJob(jobKey{"rando job", 64}, "w0")
require.ErrorIs(t, err, errJobNotFound)
}
{
err := s.completeJob(j.id, "rando worker")
err := s.completeJob(jk, "rando worker")
require.ErrorIs(t, err, errJobNotAssigned)
}
{
err := s.completeJob(jobKey{jk.id, 9999}, "w0")
require.ErrorIs(t, err, errBadEpoch)
}

{
err := s.completeJob(j.id, "w0")
err := s.completeJob(jk, "w0")
require.NoError(t, err)

err2 := s.completeJob(j.id, "w0")
err2 := s.completeJob(jk, "w0")
require.ErrorIs(t, err2, errJobNotFound)
}

j2id, j2spec, err := s.assign("w0")
require.Zero(t, j2id, "should be no job available")
j2k, j2spec, err := s.assign("w0")
require.Zero(t, j2k.id, "should be no job available")
require.Zero(t, j2spec, "should be no job available")
require.ErrorIs(t, err, errNoJobAvailable)
}

func TestLease(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))
s.addOrUpdate("job1", jobSpec{topic: "hello", commitRecTs: time.Now()})
jid, jspec, err := s.assign("w0")
require.NotZero(t, jid)
jk, jspec, err := s.assign("w0")
require.NotZero(t, jk.id)
require.NotZero(t, jspec)
require.NoError(t, err)

j, ok := s.jobs[jid]
j, ok := s.jobs[jk.id]
require.True(t, ok)
require.Equal(t, "w0", j.assignee)

// Expire the lease.
j.leaseExpiry = time.Now().Add(-1 * time.Minute)
s.clearExpiredLeases()

j2id, j2spec, err := s.assign("w1")
require.NotZero(t, j2id, "should be able to assign a job whose lease was invalidated")
j2k, j2spec, err := s.assign("w1")
require.NotZero(t, j2k.id, "should be able to assign a job whose lease was invalidated")
require.NotZero(t, j2spec, "should be able to assign a job whose lease was invalidated")
require.Equal(t, j.spec, j2spec)
require.NoError(t, err)
j2, ok := s.jobs[j2id]
j2, ok := s.jobs[j2k.id]
require.True(t, ok)
require.Equal(t, "w1", j2.assignee)

t.Run("renewals", func(t *testing.T) {
prevExpiry := j2.leaseExpiry
e1 := s.renewLease(j2.id, "w1")
e1 := s.renewLease(j2k, "w1")
require.NoError(t, e1)
require.True(t, j2.leaseExpiry.After(prevExpiry))

e2 := s.renewLease(j2.id, "w0")
e2 := s.renewLease(j2k, "w0")
require.ErrorIs(t, e2, errJobNotAssigned)

e3 := s.renewLease("job_404", "w0")
e3 := s.renewLease(jobKey{"job_404", 1}, "w0")
require.ErrorIs(t, e3, errJobNotFound)
})
}

// TestImportJob tests the importJob method - the method that is called to learn
// about jobs in-flight from a previous scheduler instance.
func TestImportJob(t *testing.T) {
s := newJobQueue(988*time.Hour, test.NewTestingLogger(t))
spec := jobSpec{commitRecTs: time.Now().Add(-1 * time.Hour)}
require.NoError(t, s.importJob(jobKey{"job1", 122}, "w0", spec))
require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec))
require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 122}, "w0", spec))
require.ErrorIs(t, errBadEpoch, s.importJob(jobKey{"job1", 60}, "w98", spec))
require.ErrorIs(t, errJobNotAssigned, s.importJob(jobKey{"job1", 123}, "w512", spec))
require.NoError(t, s.importJob(jobKey{"job1", 123}, "w2", spec))

j, ok := s.jobs["job1"]
require.True(t, ok)
require.Equal(t, jobKey{"job1", 123}, j.key)
require.Equal(t, spec, j.spec)
require.Equal(t, "w2", j.assignee)
}

func TestMinHeap(t *testing.T) {
n := 517
jobs := make([]*job, n)
order := make([]int, n)
for i := 0; i < n; i++ {
jobs[i] = &job{
id: fmt.Sprintf("job%d", i),
key: jobKey{
id: fmt.Sprintf("job%d", i),
epoch: 0,
},
spec: jobSpec{topic: "hello", commitRecTs: time.Unix(int64(i), 0)},
}
order[i] = i
Expand Down
Loading