diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index 91d50b928..5e7d725cb 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -130,8 +130,11 @@ func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgSta // StopMsgContact marks the contact for the passed in msg as stopped, that is they no longer want to receive messages func (b *backend) StopMsgContact(m courier.Msg) { + rc := b.redisPool.Get() + defer rc.Close() + dbMsg := m.(*DBMsg) - b.notifier.addStopContactNotification(dbMsg.ContactID_) + queueStopContact(rc, dbMsg.OrgID_, dbMsg.ContactID_) } // WriteMsg writes the passed in message to our store @@ -167,7 +170,7 @@ func (b *backend) WriteMsgStatus(status courier.MsgStatus) error { // we pipeline the removals because we don't care about the return value rc.Send("srem", dateKey, status.ID().String()) rc.Send("srem", prevDateKey, status.ID().String()) - err := rc.Flush() + _, err := rc.Do("") if err != nil { logrus.WithError(err).WithField("msg", status.ID().String()).Error("error clearing sent flags") } @@ -341,8 +344,10 @@ func (b *backend) Start() error { log.Info("redis ok") } - // start our dethrottler - queue.StartDethrottler(redisPool, b.stopChan, b.waitGroup, msgQueueName) + // start our dethrottler if we are going to be doing some sending + if b.config.MaxWorkers > 0 { + queue.StartDethrottler(redisPool, b.stopChan, b.waitGroup, msgQueueName) + } // create our s3 client s3Session, err := session.NewSession(&aws.Config{ @@ -373,10 +378,6 @@ func (b *backend) Start() error { log.Info("spool directories ok") } - // start our rapidpro notifier - b.notifier = newNotifier(b.config) - b.notifier.start(b) - // register and start our msg spool flushers courier.RegisterFlusher(path.Join(b.config.SpoolDir, "msgs"), b.flushMsgFile) courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile) @@ -427,8 +428,6 @@ type backend struct { popScript *redis.Script - notifier *notifier - stopChan chan bool waitGroup *sync.WaitGroup } diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 6d5c3324c..4897f2479 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -243,7 +243,8 @@ func (ts *BackendTestSuite) TestContactURN() { wait.Done() }() wait.Wait() - + ts.NotNil(contact2) + ts.NotNil(contact3) ts.Equal(contact2.ID.Int64, contact3.ID.Int64) ts.Equal(contact2.URNID.Int64, contact3.URNID.Int64) } diff --git a/backends/rapidpro/contact.go b/backends/rapidpro/contact.go index 7f3c17366..af60a45ae 100644 --- a/backends/rapidpro/contact.go +++ b/backends/rapidpro/contact.go @@ -66,6 +66,7 @@ func contactForURN(db *sqlx.DB, org OrgID, channelID courier.ChannelID, urn cour contact.UUID = uuid.NewV4().String() contact.CreatedOn = time.Now() contact.ModifiedOn = time.Now() + contact.IsNew = true // TODO: don't set name for anonymous orgs if name != "" { @@ -129,4 +130,6 @@ type DBContact struct { CreatedBy int `db:"created_by_id"` ModifiedBy int `db:"modified_by_id"` + + IsNew bool } diff --git a/backends/rapidpro/msg.go b/backends/rapidpro/msg.go index 839028eca..8021ce16d 100644 --- a/backends/rapidpro/msg.go +++ b/backends/rapidpro/msg.go @@ -144,9 +144,17 @@ func writeMsgToDB(b *backend, m *DBMsg) error { } // queue this up to be handled by RapidPro - b.notifier.addHandleMsgNotification(m.ID_) + rc := b.redisPool.Get() + defer rc.Close() + err = queueMsgHandling(rc, m.OrgID_, m.ContactID_, m.ID_, contact.IsNew) - return err + // if we had a problem queueing the handling, log it, but our message is written, it'll + // get picked up by our rapidpro catch-all after a period + if err != nil { + logrus.WithError(err).WithField("msg_id", m.ID_.Int64).Error("error queueing msg handling") + } + + return nil } const selectMsgSQL = ` diff --git a/backends/rapidpro/notifier.go b/backends/rapidpro/notifier.go deleted file mode 100644 index 44da6c0a8..000000000 --- a/backends/rapidpro/notifier.go +++ /dev/null @@ -1,117 +0,0 @@ -package rapidpro - -import ( - "fmt" - "net/http" - "net/url" - "strings" - "time" - - "github.com/nyaruka/courier" - "github.com/nyaruka/courier/config" - "github.com/nyaruka/courier/utils" - "github.com/sirupsen/logrus" -) - -func notifyRapidPro(config *config.Courier, body url.Values) error { - // build our request - req, err := http.NewRequest("POST", fmt.Sprintf(config.RapidproHandleURL, body.Get("action")), strings.NewReader(body.Encode())) - - // this really should never happen, but if it does we only log it - if err != nil { - logrus.WithField("comp", "notifier").WithError(err).Error("error creating request") - return nil - } - - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Authorization", fmt.Sprintf("Token %s", config.RapidproToken)) - _, err = utils.MakeHTTPRequest(req) - - return err -} - -func newNotifier(config *config.Courier) *notifier { - return ¬ifier{ - config: config, - notifications: make(chan url.Values, 100000), // TODO: is 100k enough? - } -} - -func (n *notifier) addHandleMsgNotification(msgID courier.MsgID) { - body := url.Values{} - body.Add("action", "handle_message") - body.Add("message_id", msgID.String()) - n.notifications <- body -} - -func (n *notifier) addStopContactNotification(contactID ContactID) { - body := url.Values{} - body.Add("action", "stop_contact") - body.Add("contact_id", fmt.Sprintf("%d", contactID.Int64)) - n.notifications <- body -} - -func (n *notifier) start(backend *backend) { - go func() { - backend.waitGroup.Add(1) - defer backend.waitGroup.Done() - - log := logrus.WithField("comp", "notifier") - log.WithField("state", "started").Info("notifier started") - - lastError := false - - for { - select { - case body := <-n.notifications: - // try to notify rapidpro - err := notifyRapidPro(n.config, body) - - // we failed, append it to our retries - if err != nil { - if !lastError { - log.WithError(err).WithField("body", body).Error("error notifying rapidpro") - } - n.retries = append(n.retries, body) - lastError = true - } else { - lastError = false - } - - // otherwise, all is well, move onto the next - - case <-backend.stopChan: - // we are being stopped, exit - log.WithField("state", "stopped").Info("notifier stopped") - return - - case <-time.After(500 * time.Millisecond): - // if we are quiet for 500ms, try to send some retries - retried := 0 - for retried < 10 && retried < len(n.retries) { - body := n.retries[0] - n.retries = n.retries[1:] - - err := notifyRapidPro(n.config, body) - if err != nil { - if !lastError { - log.WithError(err).Error("error notifying rapidpro") - } - n.retries = append(n.retries, body) - lastError = true - } else { - lastError = false - } - - retried++ - } - } - } - }() -} - -type notifier struct { - config *config.Courier - notifications chan url.Values - retries []url.Values -} diff --git a/backends/rapidpro/task.go b/backends/rapidpro/task.go new file mode 100644 index 000000000..e880a0e44 --- /dev/null +++ b/backends/rapidpro/task.go @@ -0,0 +1,53 @@ +package rapidpro + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/nyaruka/courier" + "github.com/nyaruka/courier/celery" +) + +func queueTask(rc redis.Conn, queueName string, taskName string, orgID OrgID, contactID ContactID, body map[string]interface{}) (err error) { + // encode our body + bodyJSON, err := json.Marshal(body) + if err != nil { + return err + } + + now := time.Now().UTC() + epochFraction := float64(now.UnixNano()) / float64(time.Second) + + // we do all our queueing in a transaction + rc.Send("multi") + rc.Send("zadd", fmt.Sprintf("ch:%d", contactID.Int64), fmt.Sprintf("%.5f", epochFraction), bodyJSON) + rc.Send("zadd", fmt.Sprintf("%s:%d", taskName, orgID.Int64), fmt.Sprintf("%.5f", epochFraction-10000000), bodyJSON) + rc.Send("zincrby", fmt.Sprintf("%s:active", taskName), 0, orgID.Int64) + celery.QueueEmptyTask(rc, queueName, taskName) + _, err = rc.Do("exec") + + return err +} + +func queueMsgHandling(rc redis.Conn, orgID OrgID, contactID ContactID, msgID courier.MsgID, newContact bool) error { + body := map[string]interface{}{ + "type": "msg", + "id": msgID.Int64, + "contact_id": contactID.Int64, + "new_message": true, + "new_contact": newContact, + } + + return queueTask(rc, "handler", "handle_event_task", orgID, contactID, body) +} + +func queueStopContact(rc redis.Conn, orgID OrgID, contactID ContactID) error { + body := map[string]interface{}{ + "type": "stop_contact", + "contact_id": contactID.Int64, + } + + return queueTask(rc, "handler", "handle_event_task", orgID, contactID, body) +} diff --git a/celery/celery.go b/celery/celery.go new file mode 100644 index 000000000..742b2e21a --- /dev/null +++ b/celery/celery.go @@ -0,0 +1,90 @@ +package celery + +import ( + "encoding/base64" + "encoding/json" + "fmt" + + "github.com/garyburd/redigo/redis" + "github.com/satori/go.uuid" +) + +// allows queuing a task to celery (with a redis backend) +// +// format to queue a new task to the queue named "handler" at normal priority is: +// "SADD" "_kombu.binding.handler" "handler\x06\x16\x06\x16handler" +// "LPUSH" "handler" "{\"body\": \"W1tdLCB7fSwgeyJjaG9yZCI6IG51bGwsICJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsfV0=\", +// \"headers\": {\"origin\": \"gen15039@lagom.local\", \"root_id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\", \"expires\": null, +// \"id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\", \"kwargsrepr\": \"{}\", \"lang\": \"py\", \"retries\": 0, \"task\": \"handle_event_task\", +// \"group\": null, \"timelimit\": [null, null], \"parent_id\": null, \"argsrepr\": \"()\", \"eta\": null}, \"content-type\": \"application/json\", +// \"properties\": {\"priority\": 0, \"body_encoding\": \"base64\", \"correlation_id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\", +// \"reply_to\": \"ec9440ce-1983-3e62-958b-65241f83235b\", \"delivery_info\": {\"routing_key\": \"handler\", \"exchange\": \"\"}, +// \"delivery_mode\": 2, \"delivery_tag\": \"6e43def1-ed8e-4d06-93c5-9ec9a4695eb0\"}, \"content-encoding\": \"utf-8\"}" + +const defaultBody = `[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]` + +// QueueEmptyTask queues a new empty task with the passed in task name for the passed in queue +func QueueEmptyTask(rc redis.Conn, queueName string, taskName string) error { + body := base64.StdEncoding.EncodeToString([]byte(defaultBody)) + taskUUID := uuid.NewV4().String() + + task := Task{ + Body: body, + Headers: map[string]interface{}{ + "root_id": taskUUID, + "id": taskUUID, + "lang": "py", + "kwargsrepr": "{}", + "argsrepr": "()", + "task": taskName, + "expires": nil, + }, + ContentType: "application/json", + Properties: TaskProperties{ + BodyEncoding: "base64", + CorrelationID: taskUUID, + ReplyTo: uuid.NewV4().String(), + DeliveryMode: 2, + DeliveryTag: uuid.NewV4().String(), + DeliveryInfo: TaskDeliveryInfo{ + RoutingKey: queueName, + }, + }, + ContentEncoding: "utf-8", + } + + taskJSON, err := json.Marshal(task) + if err != nil { + return err + } + + rc.Send("sadd", fmt.Sprintf("_kombu.binding.%s", queueName), fmt.Sprintf("%s\x06\x16\x06\x16%s", queueName, queueName)) + rc.Send("lpush", queueName, string(taskJSON)) + return nil +} + +// Task is the outer struct for a celery task +type Task struct { + Body string `json:"body"` + Headers map[string]interface{} `json:"headers"` + ContentType string `json:"content-type"` + Properties TaskProperties `json:"properties"` + ContentEncoding string `json:"content-encoding"` +} + +// TaskProperties is the struct for a task's properties +type TaskProperties struct { + BodyEncoding string `json:"body_encoding"` + CorrelationID string `json:"correlation_id"` + ReplyTo string `json:"replay_to"` + DeliveryInfo TaskDeliveryInfo `json:"delivery_info"` + DeliveryMode int `json:"delivery_mode"` + DeliveryTag string `json:"delivery_tag"` +} + +// TaskDeliveryInfo is the struct for a task's delivery information +type TaskDeliveryInfo struct { + Priority int `json:"priority"` + RoutingKey string `json:"routing_key"` + Exchange string `json:"exchange"` +} diff --git a/celery/celery_test.go b/celery/celery_test.go new file mode 100644 index 000000000..8653eb120 --- /dev/null +++ b/celery/celery_test.go @@ -0,0 +1,78 @@ +package celery + +import ( + "encoding/json" + "log" + "strings" + "testing" + "time" + + "github.com/garyburd/redigo/redis" +) + +func getPool() *redis.Pool { + redisPool := &redis.Pool{ + Wait: true, // makes callers wait for a connection + MaxActive: 5, // only open this many concurrent connections at once + MaxIdle: 2, // only keep up to 2 idle + IdleTimeout: 240 * time.Second, // how long to wait before reaping a connection + Dial: func() (redis.Conn, error) { + conn, err := redis.Dial("tcp", "localhost:6379") + if err != nil { + return nil, err + } + _, err = conn.Do("SELECT", 0) + return conn, err + }, + } + conn := redisPool.Get() + defer conn.Close() + + _, err := conn.Do("FLUSHDB") + if err != nil { + log.Fatal(err) + } + + return redisPool +} +func TestQueue(t *testing.T) { + pool := getPool() + defer pool.Close() + + conn := pool.Get() + defer conn.Close() + + // queue to our handler queue + conn.Send("multi") + err := QueueEmptyTask(conn, "handler", "handle_event_task") + if err != nil { + t.Error(err) + } + _, err = conn.Do("exec") + if err != nil { + t.Error(err) + } + + // check whether things look right + members, err := redis.Strings(conn.Do("smembers", "_kombu.binding.handler")) + if len(members) != 1 || !strings.HasPrefix(members[0], "handler") { + t.Errorf("handler queue has unexpected members: %s", members) + } + + taskJSON, err := redis.String(conn.Do("LPOP", "handler")) + if err != nil { + t.Errorf("should have value in handler queue: %s", err) + } + + // make sure our task is valid json + task := Task{} + err = json.Unmarshal([]byte(taskJSON), &task) + if err != nil { + t.Errorf("should be JSON: %s", err) + } + + // and is against the right queue + if task.Properties.DeliveryInfo.RoutingKey != "handler" { + t.Errorf("task should have handler as routing key") + } +} diff --git a/server.go b/server.go index f885a3418..2519c2181 100644 --- a/server.go +++ b/server.go @@ -290,7 +290,7 @@ func (s *server) channelUpdateStatusWrapper(handler ChannelHandler, handlerFunc // we received an error, write it out and report it if err != nil { - logrus.WithError(err).WithField("url", url).WithField("request", request).Error("error receiving status") + logrus.WithError(err).WithField("url", url).WithField("request", string(request)).Error("error receiving status") WriteError(ww, r, err) } @@ -339,7 +339,7 @@ func (s *server) channelReceiveMsgWrapper(handler ChannelHandler, handlerFunc Ch // if we received an error, write it out and report it if err != nil { - logrus.WithError(err).WithField("url", url).WithField("request", request).Error("error receiving message") + logrus.WithError(err).WithField("url", url).WithField("request", string(request)).Error("error receiving message") WriteError(ww, r, err) }