Skip to content

Commit

Permalink
Merge pull request cyverse-de#1 from slr71/master
Browse files Browse the repository at this point in the history
CORE-9275: modified the job status listener so that it attempts to re…
  • Loading branch information
slr71 authored Aug 3, 2018
2 parents f020666 + d57aebc commit edb1efd
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,42 @@ var log = logrus.WithFields(logrus.Fields{
var (
cfgPath = flag.String("config", "", "Path to the configuration file.")
cfg *viper.Viper

client *messaging.Client
)

// JobUpdatePublisher is the interface for types that need to publish a job
// update.
type JobUpdatePublisher interface {
PublishJobUpdate(m *messaging.UpdateMessage) error
}

func update(client JobUpdatePublisher, state messaging.JobState, jobId string, hostname string, msg string) (*messaging.UpdateMessage, error) {
func update(publisher JobUpdatePublisher, state messaging.JobState, jobId string, hostname string, msg string) (*messaging.UpdateMessage, error) {
updateMessage := &messaging.UpdateMessage{
Job: messaging.JobDetails{InvocationID: jobId},
State: state,
Message: msg,
Sender: hostname,
}

err := client.PublishJobUpdate(updateMessage)
err := publisher.PublishJobUpdate(updateMessage)
if err == nil {
log.Infof("%s (%s) [%s]: %s", jobId, state, hostname, msg)
return updateMessage, nil
}

// The first attempt to record the message failed.
log.Errorf("failed to publish job status update: %s", err)

// Attempt to reestablish the connection.
log.Info("attempting to reestablish the messaging connection")
err = publisher.Reconnect()
if err != nil {
log.Error(err)
log.Errorf("unable to reestablish the messaging connection: %s", err)
return nil, err
}
log.Infof("%s (%s) [%s]: %s", jobId, state, hostname, msg)
return updateMessage, nil

// Attempt to record the message one more time.
err = publisher.PublishJobUpdate(updateMessage)
if err == nil {
log.Infof("%s (%s) [%s]: %s", jobId, state, hostname, msg)
return updateMessage, nil
}

log.Errorf("failed to publish job status update again - giving up: %s", err)
return nil, err
}

type MessagePost struct {
Expand All @@ -76,7 +87,7 @@ func getState(state string) (messaging.JobState, error) {
}
}

func postUpdate(w http.ResponseWriter, r *http.Request) {
func postUpdate(publisher JobUpdatePublisher, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
out := json.NewEncoder(w)

Expand Down Expand Up @@ -105,14 +116,14 @@ func postUpdate(w http.ResponseWriter, r *http.Request) {
return
}

msg, err := update(client, state, jobId, updateMessage.Hostname, updateMessage.Message)
msg, err := update(publisher, state, jobId, updateMessage.Hostname, updateMessage.Message)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Error(err)
out.Encode(map[string]string{
"error": err.Error(),
})
return
log.Fatal("failed to record a valid job status update - aborting")
}
out.Encode(msg)
}
Expand All @@ -130,10 +141,14 @@ func loadConfig(cfgPath string) {
}
}

func newRouter() *mux.Router {
func newRouter(publisher JobUpdatePublisher) *mux.Router {
r := mux.NewRouter()
r.Handle("/debug/vars", http.DefaultServeMux)
r.Path("/{uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}/status").Methods("POST").HandlerFunc(postUpdate)
r.Path("/{uuid:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}/status").Methods("POST").HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
postUpdate(publisher, w, r)
},
)

return r
}
Expand All @@ -144,16 +159,14 @@ func main() {

uri := cfg.GetString("amqp.uri")
exchange := cfg.GetString("amqp.exchange.name")
var err error
client, err = messaging.NewClient(uri, true)

publisher, err := NewDefaultJobUpdatePublisher(uri, exchange)
if err != nil {
log.Fatal(err)
}
defer client.Close()

client.SetupPublishing(exchange)
defer publisher.Close()

r := newRouter()
r := newRouter(publisher)

listenPortSpec := ":" + "60000"
log.Infof("Listening on %s", listenPortSpec)
Expand Down

0 comments on commit edb1efd

Please sign in to comment.