Skip to content

Commit

Permalink
Merge branch 'main' into refactor/vtex-ads
Browse files Browse the repository at this point in the history
  • Loading branch information
Robi9 authored Sep 26, 2024
2 parents 6d087f8 + f557b51 commit 0a30cbd
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 8 deletions.
33 changes: 33 additions & 0 deletions WENI-CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
1.34.6-mailroom-7.1.22
----------
* Fix brain to only send attachments when entry is "@input.text"

1.34.5-mailroom-7.1.22
----------
* Fix open wenichats on open ticket to handle properly for contact without preferred urn

1.34.4-mailroom-7.1.22
----------
* Fix vtex intelligent search url

1.34.3-mailroom-7.1.22
----------
* Allow locale query param on vtex intelligent search
* Update goflow for v0.14.2-goflow-0.144.3

1.34.2-mailroom-7.1.22
----------
* Update goflow for v0.14.1-goflow-0.144.3

1.34.1-mailroom-7.1.22
----------
* Handle invalid vtex api search type

1.34.0-mailroom-7.1.22
----------
* Handle brain flowstart msg event with order

1.33.1-mailroom-7.1.22
----------
* Return call result if cart simulation fails

1.33.0-mailroom-7.1.22
----------
* Update goflow to v0.14.0-goflow-0.144.3
Expand Down
109 changes: 109 additions & 0 deletions core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@ package runner

import (
"context"
"encoding/json"
"time"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/triggers"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/librato"
"github.com/nyaruka/mailroom/core/goflow"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/runtime/metrics"
"github.com/nyaruka/mailroom/utils/locker"
"github.com/nyaruka/null"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -141,6 +145,70 @@ func ResumeFlow(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets,
return session, nil
}

// The params that weni brain send when starting a flow
type brainStartParams struct {
Message string `json:"message"`
MsgEvent startMsgEvent `json:"msg_event"`
}

// A MsgEvent that weni brain send as a param to start a flow, attachments and metadata are manually handled
type startMsgEvent struct {
ContactID models.ContactID `json:"contact_id,omitempty"`
OrgID models.OrgID `json:"org_id,omitempty"`
ChannelID models.ChannelID `json:"channel_id,omitempty"`
MsgID flows.MsgID `json:"msg_id,omitempty"`
MsgUUID flows.MsgUUID `json:"msg_uuid,omitempty"`
MsgExternalID null.String `json:"msg_external_id,omitempty"`
URN urns.URN `json:"urn,omitempty"`
URNID models.URNID `json:"urn_id,omitempty"`
Text string `json:"text,omitempty"`
Attachments map[string]string `json:"attachments,omitempty"`
Metadata *startMetadata `json:"metadata,omitempty"`
}

func (m *startMsgEvent) GetAttachments() []utils.Attachment {
attachments := make([]utils.Attachment, 0, len(m.Attachments))
for _, v := range m.Attachments {
attachments = append(attachments, utils.Attachment(v))
}
return attachments
}

func (m *startMsgEvent) Valid() bool {
return m.ContactID != 0 &&
m.OrgID != 0 &&
m.ChannelID != 0 &&
m.MsgID != 0 &&
m.MsgUUID != "" &&
m.URN != "" &&
m.URNID != 0
}

type startMetadata struct {
Order *startOrder `json:"order,omitempty"`
}

type startOrder struct {
CatalogID string `json:"catalog_id,omitempty"`
Text string `json:"text,omitempty"`
ProductItems map[string]flows.ProductItem `json:"product_items,omitempty"`
}

func (x *startOrder) toOrder() *flows.Order {
order := &flows.Order{}
order.CatalogID = x.CatalogID
order.Text = x.Text

if x.ProductItems != nil {
order.ProductItems = make([]flows.ProductItem, 0, len(x.ProductItems))
for _, v := range x.ProductItems {
order.ProductItems = append(order.ProductItems, v)
}
}

return order
}

// StartFlowBatch starts the flow for the passed in org, contacts and flow
func StartFlowBatch(
ctx context.Context, rt *runtime.Runtime,
Expand Down Expand Up @@ -191,6 +259,20 @@ func StartFlowBatch(
}
}

var brainStartMsgEvent *brainStartParams
if params != nil {
xMsgEvent, err := params.MarshalJSON()
if err != nil {
return nil, errors.Wrap(err, "unable to marshal JSON from flow start params")
}

brainStartMsgEvent = &brainStartParams{}
err = json.Unmarshal(xMsgEvent, brainStartMsgEvent)
if err != nil {
return nil, errors.Wrap(err, "unable to unmarshal JSON from flow start start")
}
}

var history *flows.SessionHistory
if len(batch.SessionHistory()) > 0 {
history, err = models.ReadSessionHistory(batch.SessionHistory())
Expand All @@ -212,6 +294,33 @@ func StartFlowBatch(
return tb.Build()
}

// if we have a message event sent from weni brain, we need to build a trigger for that
if brainStartMsgEvent != nil {
msgEvent := brainStartMsgEvent.MsgEvent

if msgEvent.Valid() {
channel := oa.ChannelByID(msgEvent.ChannelID)
msgIn := flows.NewMsgIn(msgEvent.MsgUUID, msgEvent.URN, channel.ChannelReference(), msgEvent.Text, msgEvent.GetAttachments())
msgIn.SetExternalID(string(msgEvent.MsgExternalID))
msgIn.SetID(msgEvent.MsgID)

if (msgEvent.Metadata != nil) {
if (msgEvent.Metadata.Order != nil) {
msgIn.SetOrder(msgEvent.Metadata.Order.toOrder())
}
}

// create a trigger that contains the incoming message from weni brain
tb := triggers.NewBuilder(oa.Env(), flow.FlowReference(), contact).Msg(msgIn)

if batch.Extra() != nil {
tb = tb.WithParams(params)
}

return tb.Build()
}
}

tb := triggers.NewBuilder(oa.Env(), flow.FlowReference(), contact).Manual()
if batch.Extra() != nil {
tb = tb.WithParams(params)
Expand Down
110 changes: 110 additions & 0 deletions core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner_test
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -171,6 +172,115 @@ func TestBatchStart(t *testing.T) {
}
}

func TestBatchStartWithOrderInExtra(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetAll)

// create a start object
testdata.InsertFlowStart(db, testdata.Org1, testdata.SingleMessage, nil)

// create our incoming message
msg := testdata.InsertIncomingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "Here's my order", models.MsgStatusHandled)

// and our batch object
contactIDs := []models.ContactID{testdata.Cathy.ID}

tcs := []struct {
Flow models.FlowID
Restart models.RestartParticipants
IncludeActive models.IncludeActive
Extra json.RawMessage
Msg string
Count int
TotalCount int
}{
{
Flow: testdata.InputOrderFlow.ID,
Restart: true,
IncludeActive: false,
Extra: json.RawMessage([]byte(fmt.Sprintf(`{
"message": "Here's my order",
"msg_event": {
"contact_urn": "%s",
"contact_id": %d,
"org_id": %d,
"channel_id": %d,
"msg_id": %d,
"msg_uuid": "%s",
"msg_external_id": "%s",
"urn": "%s",
"urn_id": %d,
"text": "%s",
"attachments": {
"0": "image:https://link.to/image.jpg"
},
"metadata": {
"order": {
"catalog_id": "1",
"text": "Here's my order",
"product_items": {
"0": {
"currency": "BRL",
"item_price": 10.99,
"product_retailer_id": "retailer_id_1",
"quantity": 2
}
}
}
}
}
}`, testdata.Cathy.URN,
testdata.Cathy.ID,
models.OrgID(1),
testdata.TwilioChannel.ID,
msg.ID(),
msg.UUID(),
msg.ExternalID(),
testdata.Cathy.URN,
testdata.Cathy.URNID,
msg.Text(),
))),
Msg: "Your order costs: 2 * 10.99 BRL",
Count: 1,
TotalCount: 1,
},
}

last := time.Now()

for i, tc := range tcs {
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, tc.Flow, tc.Restart, tc.IncludeActive).
WithContactIDs(contactIDs).
WithExtra(tc.Extra)
batch := start.CreateBatch(contactIDs, true, len(contactIDs))

sessions, err := runner.StartFlowBatch(ctx, rt, batch)
require.NoError(t, err)
assert.Equal(t, tc.Count, len(sessions), "%d: unexpected number of sessions created", i)

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowsession WHERE contact_id = ANY($1)
AND status = 'C' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL AND created_on > $2`, pq.Array(contactIDs), last).
Returns(tc.Count, "%d: unexpected number of sessions", i)

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowrun WHERE contact_id = ANY($1) and flow_id = $2
AND is_active = FALSE AND responded = TRUE AND org_id = 1 AND exit_type = 'C' AND status = 'C'
AND results IS NOT NULL AND path IS NOT NULL AND events IS NOT NULL
AND session_id IS NOT NULL`, pq.Array(contactIDs), tc.Flow).
Returns(tc.TotalCount, "%d: unexpected number of runs", i)

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM msgs_msg WHERE contact_id = ANY($1) AND text = $2 AND org_id = 1 AND status = 'Q'
AND queued_on IS NOT NULL AND direction = 'O' AND msg_type = 'F' AND channel_id = $3`,
pq.Array(contactIDs), tc.Msg, testdata.TwilioChannel.ID).
Returns(tc.TotalCount, "%d: unexpected number of messages", i)

last = time.Now()
}
}

func TestResume(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

Expand Down
2 changes: 2 additions & 0 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,12 +927,14 @@ func requestToRouter(event *MsgEvent, rtConfig *runtime.Config, projectUUID uuid
Text string `json:"text"`
Attachments []utils.Attachment `json:"attachments"`
Metadata json.RawMessage `json:"metadata"`
MsgEvent MsgEvent `json:"msg_event"`
}{
ProjectUUID: projectUUID,
ContactURN: event.URN,
Text: event.Text,
Attachments: event.Attachments,
Metadata: event.Metadata,
MsgEvent: *event,
}

var b io.Reader
Expand Down
Binary file modified mailroom_test.dump
Binary file not shown.
23 changes: 21 additions & 2 deletions services/external/weni/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *service) Call(session flows.Session, params assets.MsgCatalogParam, log
existingProductsIds, trace, err = CartSimulation(allProducts, sellerID, params.SearchUrl, postalCode_)
callResult.Traces = append(callResult.Traces, trace)
if err != nil {
return nil, err
return callResult, err
}
}

Expand Down Expand Up @@ -470,11 +470,30 @@ func VtexIntelligentSearch(searchUrl string, productSearch string) ([]string, []

traces := []*httpx.Trace{}

searchUrlParts := strings.Split(searchUrl, "?")
searchUrl = searchUrlParts[0]
queryParams := map[string][]string{}
var err error
if len(searchUrlParts) > 1 {
queryParams, err = url.ParseQuery(searchUrlParts[1])
if err != nil {
return nil, nil, err
}
}

query := url.Values{}
query.Add("query", productSearch)
query.Add("locale", "pt-BR")
query.Add("hideUnavailableItems", "true")

for key, value := range queryParams {
query.Add(key, value[0])
}

// add default pt-BR locale
if _, ok := queryParams["locale"]; !ok {
query.Add("locale", "pt-BR")
}

urlAfter := strings.TrimSuffix(searchUrl, "/")

url_ := fmt.Sprintf("%s?%s", urlAfter, query.Encode())
Expand Down
11 changes: 10 additions & 1 deletion services/tickets/wenichats/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,16 @@ func (s *service) Open(session flows.Session, topic *flows.Topic, body string, a

roomData.SectorUUID = s.sectorUUID
roomData.QueueUUID = string(topic.UUID())
roomData.Contact.URN = session.Contact().PreferredURN().URN().String()
preferredURN := session.Contact().PreferredURN()
if preferredURN != nil {
roomData.Contact.URN = preferredURN.URN().String()
} else {
urns := session.Contact().URNs()
if len(urns) == 0 {
return nil, errors.New("failed to open ticket, no urn found for contact")
}
roomData.Contact.URN = urns[0].String()
}
roomData.FlowUUID = session.Runs()[0].Flow().UUID()
roomData.Contact.Groups = groups

Expand Down
5 changes: 5 additions & 0 deletions services/tickets/wenichats/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@ func TestOpenAndForward(t *testing.T) {
assert.Equal(t, "9688d21d-95aa-4bed-afc7-f31b35731a3d", ticket.ExternalID())
assert.Equal(t, 2, len(logger3.Logs))
test.AssertSnapshot(t, "open_ticket_empty_body", logger3.Logs[0].Request)

session.Contact().ClearURNs()
_, err = svc.Open(session, defaultTopic, "{\"history_after\":\"2019-10-07 15:21:30\"}", nil, logger3.Log)

assert.Equal(t, "failed to open ticket, no urn found for contact", err.Error())
}

func TestCloseAndReopen(t *testing.T) {
Expand Down
Loading

0 comments on commit 0a30cbd

Please sign in to comment.