-
Notifications
You must be signed in to change notification settings - Fork 4
/
jobqueue.go
103 lines (89 loc) · 2.29 KB
/
jobqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package jobqueue
import (
"sync"
)
// JobQueue represents a queue for enqueueing jobs to be processed
type JobQueue struct {
internalQueue chan Job
readyPool chan chan Job
workers []*Worker
dispatcherStopped *sync.WaitGroup
workersStopped *sync.WaitGroup
quit chan bool
stopped bool
stoppedMutex *sync.Mutex
}
// NewJobQueue creates a new job queue
func NewJobQueue(maxWorkers int) *JobQueue {
workersStopped := &sync.WaitGroup{}
readyPool := make(chan chan Job, maxWorkers)
workers := make([]*Worker, maxWorkers, maxWorkers)
for i := 0; i < maxWorkers; i++ {
workers[i] = NewWorker(readyPool, workersStopped)
}
return &JobQueue{
internalQueue: make(chan Job),
readyPool: readyPool,
workers: workers,
dispatcherStopped: &sync.WaitGroup{},
workersStopped: workersStopped,
quit: make(chan bool),
}
}
// Start starts the workers and dispatcher
func (q *JobQueue) Start() {
for i := 0; i < len(q.workers); i++ {
q.workers[i].Start()
}
go q.dispatch()
q.stoppedMutex = &sync.Mutex{}
q.setStopped(false)
}
// Stop stops the workers and dispatcher
// Also closes internal job queue channel after the last value has been received
func (q *JobQueue) Stop() {
q.setStopped(true)
// Stopping queue
q.quit <- true
q.dispatcherStopped.Wait()
// Stopped queue
close(q.internalQueue)
}
// Stopped indicates wether the queue is stopped
func (q *JobQueue) Stopped() bool {
return q.getStopped()
}
func (q *JobQueue) setStopped(s bool) {
q.stoppedMutex.Lock()
q.stopped = s
q.stoppedMutex.Unlock()
}
func (q *JobQueue) getStopped() bool {
q.stoppedMutex.Lock()
s := q.stopped
q.stoppedMutex.Unlock()
return s
}
func (q *JobQueue) dispatch() {
q.dispatcherStopped.Add(1)
for {
select {
case job := <-q.internalQueue: // we got something in on our queue
workerChannel := <-q.readyPool // check out an available worker
workerChannel <- job // send the request to the channel
case <-q.quit:
for i := 0; i < len(q.workers); i++ {
q.workers[i].Stop()
}
q.workersStopped.Wait()
q.dispatcherStopped.Done()
return
}
}
}
// Submit submits a new job to be processed to the internal queue
func (q *JobQueue) Submit(job Job) {
if !q.getStopped() {
q.internalQueue <- job
}
}