-
Notifications
You must be signed in to change notification settings - Fork 9
/
queries.go
125 lines (117 loc) · 2.92 KB
/
queries.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package pgq
import (
"database/sql"
"fmt"
"time"
"github.com/guregu/null"
"github.com/jmoiron/sqlx"
"github.com/joomcode/errorx"
)
func getNextJob(tx *sqlx.Tx, queueNames []string) (*Job, error) {
query, args, err := sqlx.In(`
SELECT * FROM pgq_jobs
WHERE
queue_name IN (?)
AND run_after < ?
AND ran_at IS NULL
ORDER BY run_after
LIMIT 1
FOR UPDATE SKIP LOCKED;`,
queueNames,
time.Now(),
)
if err != nil {
return nil, errorx.Decorate(err, "could not create job query")
}
query = tx.Rebind(query)
job := &Job{}
err = tx.Get(job, query, args...)
switch err {
case nil:
return job, nil
case sql.ErrNoRows:
return nil, nil
default:
return nil, errorx.Decorate(err, "could not get next job")
}
}
// DB is anything with the DB methods on it that we need. (like a DB or a Tx)
type DB interface {
Exec(string, ...interface{}) (sql.Result, error)
QueryRow(string, ...interface{}) *sql.Row
}
func enqueueJob(execer DB, queueName string, data []byte, options ...JobOption) (int, error) {
// create job with provided data and default options
job := &Job{
QueueName: queueName,
Data: data,
RunAfter: time.Now(),
// by default, we'll do 3 attempts with 60 seconds between each.
RetryWaits: []time.Duration{
time.Second * 60,
time.Second * 60 * 10,
time.Second * 60 * 30,
},
}
// Apply any job customzations provided by the user
for _, option := range options {
err := applyJobOption(option, job)
if err != nil {
return 0, errorx.Decorate(err, "error/panic while applying option")
}
}
// persist
var jobID int
err := execer.QueryRow(`
INSERT INTO pgq_jobs (
queue_name,
data,
run_after,
retry_waits
) VALUES (
$1,
$2,
$3,
$4
) RETURNING id;
`, job.QueueName, job.Data, job.RunAfter, job.RetryWaits).Scan(&jobID)
return jobID, errorx.DecorateMany("could not enqueue job", err)
}
func updateJob(execer DB, job *Job, ranAt time.Time, jobErr error) error {
job.RanAt = null.TimeFrom(ranAt)
if jobErr != nil {
job.Error = null.StringFrom(jobErr.Error())
}
result, err := execer.Exec(`
UPDATE pgq_jobs SET
ran_at = $1,
error = $2
WHERE
id = $3;`,
job.RanAt,
job.Error,
job.ID,
)
if err != nil || result == nil {
return errorx.Decorate(err, "could not update job with result")
}
affected, affectedErr := result.RowsAffected()
if affected != 1 {
return fmt.Errorf("expected to update 1 job, but updated %d", affected)
}
return errorx.DecorateMany("could not get rows affected", affectedErr)
}
func deleteJob(execer DB, job *Job) error {
result, err := execer.Exec(`
DELETE from pgq_jobs
WHERE id = $1;
`, job.ID)
if err != nil || result == nil {
return errorx.Decorate(err, "could not delete job")
}
affected, affectedErr := result.RowsAffected()
if affected != 1 {
return fmt.Errorf("expected to delete 1 job, but deleted %d", affected)
}
return errorx.DecorateMany("could not get rows affected", affectedErr)
}