Skip to content

Commit

Permalink
Queue handling of messages and contact stops directly to celery
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Aug 30, 2017
1 parent 516c941 commit 24e035c
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 132 deletions.
19 changes: 9 additions & 10 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@ func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgSta

// StopMsgContact marks the contact for the passed in msg as stopped, that is they no longer want to receive messages
func (b *backend) StopMsgContact(m courier.Msg) {
rc := b.redisPool.Get()
defer rc.Close()

dbMsg := m.(*DBMsg)
b.notifier.addStopContactNotification(dbMsg.ContactID_)
queueStopContact(rc, dbMsg.OrgID_, dbMsg.ContactID_)
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -167,7 +170,7 @@ func (b *backend) WriteMsgStatus(status courier.MsgStatus) error {
// we pipeline the removals because we don't care about the return value
rc.Send("srem", dateKey, status.ID().String())
rc.Send("srem", prevDateKey, status.ID().String())
err := rc.Flush()
_, err := rc.Do("")
if err != nil {
logrus.WithError(err).WithField("msg", status.ID().String()).Error("error clearing sent flags")
}
Expand Down Expand Up @@ -341,8 +344,10 @@ func (b *backend) Start() error {
log.Info("redis ok")
}

// start our dethrottler
queue.StartDethrottler(redisPool, b.stopChan, b.waitGroup, msgQueueName)
// start our dethrottler if we are going to be doing some sending
if b.config.MaxWorkers > 0 {
queue.StartDethrottler(redisPool, b.stopChan, b.waitGroup, msgQueueName)
}

// create our s3 client
s3Session, err := session.NewSession(&aws.Config{
Expand Down Expand Up @@ -373,10 +378,6 @@ func (b *backend) Start() error {
log.Info("spool directories ok")
}

// start our rapidpro notifier
b.notifier = newNotifier(b.config)
b.notifier.start(b)

// register and start our msg spool flushers
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "msgs"), b.flushMsgFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)
Expand Down Expand Up @@ -427,8 +428,6 @@ type backend struct {

popScript *redis.Script

notifier *notifier

stopChan chan bool
waitGroup *sync.WaitGroup
}
3 changes: 2 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ func (ts *BackendTestSuite) TestContactURN() {
wait.Done()
}()
wait.Wait()

ts.NotNil(contact2)
ts.NotNil(contact3)
ts.Equal(contact2.ID.Int64, contact3.ID.Int64)
ts.Equal(contact2.URNID.Int64, contact3.URNID.Int64)
}
Expand Down
3 changes: 3 additions & 0 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func contactForURN(db *sqlx.DB, org OrgID, channelID courier.ChannelID, urn cour
contact.UUID = uuid.NewV4().String()
contact.CreatedOn = time.Now()
contact.ModifiedOn = time.Now()
contact.IsNew = true

// TODO: don't set name for anonymous orgs
if name != "" {
Expand Down Expand Up @@ -129,4 +130,6 @@ type DBContact struct {

CreatedBy int `db:"created_by_id"`
ModifiedBy int `db:"modified_by_id"`

IsNew bool
}
12 changes: 10 additions & 2 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,17 @@ func writeMsgToDB(b *backend, m *DBMsg) error {
}

// queue this up to be handled by RapidPro
b.notifier.addHandleMsgNotification(m.ID_)
rc := b.redisPool.Get()
defer rc.Close()
err = queueMsgHandling(rc, m.OrgID_, m.ContactID_, m.ID_, contact.IsNew)

return err
// if we had a problem queueing the handling, log it, but our message is written, it'll
// get picked up by our rapidpro catch-all after a period
if err != nil {
logrus.WithError(err).WithField("msg_id", m.ID_.Int64).Error("error queueing msg handling")
}

return nil
}

const selectMsgSQL = `
Expand Down
117 changes: 0 additions & 117 deletions backends/rapidpro/notifier.go

This file was deleted.

53 changes: 53 additions & 0 deletions backends/rapidpro/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package rapidpro

import (
"encoding/json"
"fmt"
"time"

"github.com/garyburd/redigo/redis"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/celery"
)

func queueTask(rc redis.Conn, queueName string, taskName string, orgID OrgID, contactID ContactID, body map[string]interface{}) (err error) {
// encode our body
bodyJSON, err := json.Marshal(body)
if err != nil {
return err
}

now := time.Now().UTC()
epochFraction := float64(now.UnixNano()) / float64(time.Second)

// we do all our queueing in a transaction
rc.Send("multi")
rc.Send("zadd", fmt.Sprintf("ch:%d", contactID.Int64), fmt.Sprintf("%.5f", epochFraction), bodyJSON)
rc.Send("zadd", fmt.Sprintf("%s:%d", taskName, orgID.Int64), fmt.Sprintf("%.5f", epochFraction-10000000), bodyJSON)
rc.Send("zincrby", fmt.Sprintf("%s:active", taskName), 0, orgID.Int64)
celery.QueueEmptyTask(rc, queueName, taskName)
_, err = rc.Do("exec")

return err
}

func queueMsgHandling(rc redis.Conn, orgID OrgID, contactID ContactID, msgID courier.MsgID, newContact bool) error {
body := map[string]interface{}{
"type": "msg",
"id": msgID.Int64,
"contact_id": contactID.Int64,
"new_message": true,
"new_contact": newContact,
}

return queueTask(rc, "handler", "handle_event_task", orgID, contactID, body)
}

func queueStopContact(rc redis.Conn, orgID OrgID, contactID ContactID) error {
body := map[string]interface{}{
"type": "stop_contact",
"contact_id": contactID.Int64,
}

return queueTask(rc, "handler", "handle_event_task", orgID, contactID, body)
}
90 changes: 90 additions & 0 deletions celery/celery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package celery

import (
"encoding/base64"
"encoding/json"
"fmt"

"github.com/garyburd/redigo/redis"
"github.com/satori/go.uuid"
)

// allows queuing a task to celery (with a redis backend)
//
// format to queue a new task to the queue named "handler" at normal priority is:
// "SADD" "_kombu.binding.handler" "handler\x06\x16\x06\x16handler"
// "LPUSH" "handler" "{\"body\": \"W1tdLCB7fSwgeyJjaG9yZCI6IG51bGwsICJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsfV0=\",
// \"headers\": {\"origin\": \"[email protected]\", \"root_id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\", \"expires\": null,
// \"id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\", \"kwargsrepr\": \"{}\", \"lang\": \"py\", \"retries\": 0, \"task\": \"handle_event_task\",
// \"group\": null, \"timelimit\": [null, null], \"parent_id\": null, \"argsrepr\": \"()\", \"eta\": null}, \"content-type\": \"application/json\",
// \"properties\": {\"priority\": 0, \"body_encoding\": \"base64\", \"correlation_id\": \"adc1f782-c356-4aa1-acc8-238c2b348cac\",
// \"reply_to\": \"ec9440ce-1983-3e62-958b-65241f83235b\", \"delivery_info\": {\"routing_key\": \"handler\", \"exchange\": \"\"},
// \"delivery_mode\": 2, \"delivery_tag\": \"6e43def1-ed8e-4d06-93c5-9ec9a4695eb0\"}, \"content-encoding\": \"utf-8\"}"

const defaultBody = `[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]`

// QueueEmptyTask queues a new empty task with the passed in task name for the passed in queue
func QueueEmptyTask(rc redis.Conn, queueName string, taskName string) error {
body := base64.StdEncoding.EncodeToString([]byte(defaultBody))
taskUUID := uuid.NewV4().String()

task := Task{
Body: body,
Headers: map[string]interface{}{
"root_id": taskUUID,
"id": taskUUID,
"lang": "py",
"kwargsrepr": "{}",
"argsrepr": "()",
"task": taskName,
"expires": nil,
},
ContentType: "application/json",
Properties: TaskProperties{
BodyEncoding: "base64",
CorrelationID: taskUUID,
ReplyTo: uuid.NewV4().String(),
DeliveryMode: 2,
DeliveryTag: uuid.NewV4().String(),
DeliveryInfo: TaskDeliveryInfo{
RoutingKey: queueName,
},
},
ContentEncoding: "utf-8",
}

taskJSON, err := json.Marshal(task)
if err != nil {
return err
}

rc.Send("sadd", fmt.Sprintf("_kombu.binding.%s", queueName), fmt.Sprintf("%s\x06\x16\x06\x16%s", queueName, queueName))
rc.Send("lpush", queueName, string(taskJSON))
return nil
}

// Task is the outer struct for a celery task
type Task struct {
Body string `json:"body"`
Headers map[string]interface{} `json:"headers"`
ContentType string `json:"content-type"`
Properties TaskProperties `json:"properties"`
ContentEncoding string `json:"content-encoding"`
}

// TaskProperties is the struct for a task's properties
type TaskProperties struct {
BodyEncoding string `json:"body_encoding"`
CorrelationID string `json:"correlation_id"`
ReplyTo string `json:"replay_to"`
DeliveryInfo TaskDeliveryInfo `json:"delivery_info"`
DeliveryMode int `json:"delivery_mode"`
DeliveryTag string `json:"delivery_tag"`
}

// TaskDeliveryInfo is the struct for a task's delivery information
type TaskDeliveryInfo struct {
Priority int `json:"priority"`
RoutingKey string `json:"routing_key"`
Exchange string `json:"exchange"`
}
Loading

0 comments on commit 24e035c

Please sign in to comment.