forked from cloudfoundry-attic/receptor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task_worker.go
132 lines (109 loc) · 3.15 KB
/
task_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package task_handler
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"regexp"
"github.com/cloudfoundry-incubator/cf_http"
"github.com/cloudfoundry-incubator/receptor/serialization"
Bbs "github.com/cloudfoundry-incubator/runtime-schema/bbs"
"github.com/cloudfoundry-incubator/runtime-schema/models"
"github.com/pivotal-golang/lager"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
)
const MAX_RETRIES = 3
const POOL_SIZE = 20
func NewTaskWorkerPool(bbs Bbs.ReceptorBBS, logger lager.Logger) (ifrit.Runner, chan<- models.Task) {
taskQueue := make(chan models.Task, POOL_SIZE)
members := make(grouper.Members, POOL_SIZE)
for i := 0; i < POOL_SIZE; i++ {
name := fmt.Sprintf("task-worker-%d", i)
members[i].Name = name
members[i].Runner = newTaskWorker(taskQueue, bbs, logger.Session(name))
}
return grouper.NewParallel(os.Interrupt, members), taskQueue
}
func newTaskWorker(taskQueue <-chan models.Task, bbs Bbs.ReceptorBBS, logger lager.Logger) *taskWorker {
return &taskWorker{
taskQueue: taskQueue,
bbs: bbs,
logger: logger,
httpClient: cf_http.NewClient(),
}
}
type taskWorker struct {
taskQueue <-chan models.Task
bbs Bbs.ReceptorBBS
logger lager.Logger
httpClient *http.Client
}
func (t *taskWorker) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
t.logger.Debug("starting")
close(ready)
for {
select {
case task := <-t.taskQueue:
t.handleCompletedTask(task)
case <-signals:
t.logger.Debug("exited")
return nil
}
}
}
func (t *taskWorker) handleCompletedTask(task models.Task) {
logger := t.logger.WithData(lager.Data{"task-guid": task.TaskGuid})
if task.CompletionCallbackURL != nil {
logger.Info("resolving-task")
err := t.bbs.ResolvingTask(logger, task.TaskGuid)
if err != nil {
logger.Error("marking-task-as-resolving-failed", err)
return
}
logger = logger.WithData(lager.Data{"callback_url": task.CompletionCallbackURL.String()})
json, err := json.Marshal(serialization.TaskToResponse(task))
if err != nil {
logger.Error("marshalling-task-failed", err)
return
}
var statusCode int
for i := 0; i < MAX_RETRIES; i++ {
request, err := http.NewRequest("POST", task.CompletionCallbackURL.String(), bytes.NewReader(json))
if err != nil {
logger.Error("building-request-failed", err)
return
}
request.Header.Set("Content-Type", "application/json")
response, err := t.httpClient.Do(request)
if err != nil {
matched, _ := regexp.MatchString("use of closed network connection", err.Error())
if matched {
continue
}
logger.Error("doing-request-failed", err)
return
}
statusCode = response.StatusCode
if shouldResolve(statusCode) {
err = t.bbs.ResolveTask(logger, task.TaskGuid)
if err != nil {
logger.Error("resolving-task-failed", err)
return
}
logger.Info("resolved-task", lager.Data{"status_code": statusCode})
return
}
}
logger.Info("callback-failed", lager.Data{"status_code": statusCode})
}
}
func shouldResolve(status int) bool {
switch status {
case http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return false
default:
return true
}
}