forked from adjust/redismq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
package.go
111 lines (97 loc) · 2.41 KB
/
package.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package redismq
import (
"bytes"
"encoding/gob"
"fmt"
"time"
)
// Package provides headers and handling functions around payloads
type Package struct {
Payload []byte
CreatedAt time.Time
Queue *Queue `json:"-"`
Consumer *Consumer `json:"-"`
Collection *[]*Package `json:"-"`
Acked bool `json:"-"`
//TODO add Headers or smth. when needed
//wellle suggested error headers for failed packages
}
func unmarshalPackage(input string, queue *Queue, consumer *Consumer) (*Package, error) {
p := &Package{Queue: queue, Consumer: consumer, Acked: false}
var dec = gob.NewDecoder(bytes.NewBuffer([]byte(input)))
err := dec.Decode(p)
if err != nil {
return nil, err
}
return p, nil
}
func (pack *Package) getString() string {
gob.Register(Package{})
gob.Register(Queue{})
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(*pack); err != nil {
panic(err)
}
return string(buffer.Bytes())
}
func (pack *Package) index() int {
if pack.Collection == nil {
return 0
}
var i int
for i = range *pack.Collection {
if (*pack.Collection)[i] == pack {
break
}
}
return i
}
// MultiAck removes all packaes from the fetched array up to and including this package
func (pack *Package) MultiAck() (err error) {
if pack.Collection == nil {
return fmt.Errorf("cannot MultiAck single package")
}
// TODO write in lua
for i := 0; i <= pack.index(); i++ {
var p *Package
p = (*pack.Collection)[i]
// if the package has already been acked just skip
if p.Acked == true {
continue
}
err = pack.Consumer.ackPackage(p)
if err != nil {
break
}
p.Acked = true
}
return
}
// Ack removes the packages from the queue
func (pack *Package) Ack() error {
if pack.Collection != nil {
return fmt.Errorf("cannot Ack package in multi package answer")
}
err := pack.Consumer.ackPackage(pack)
return err
}
// Requeue moves a package back to input
func (pack *Package) Requeue() error {
return pack.reject(true)
}
// Fail moves a package to the failed queue
func (pack *Package) Fail() error {
return pack.reject(false)
}
func (pack *Package) reject(requeue bool) error {
if pack.Collection != nil && (*pack.Collection)[pack.index()-1].Acked == false {
return fmt.Errorf("cannot reject package while unacked package before it")
}
if !requeue {
err := pack.Consumer.failPackage(pack)
return err
}
err := pack.Consumer.requeuePackage(pack)
return err
}