Skip to content

Commit

Permalink
WTEL-5623
Browse files Browse the repository at this point in the history
  • Loading branch information
i.navrotskyj committed Dec 4, 2024
1 parent ef1fcb7 commit b3b4e68
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 118 deletions.
9 changes: 8 additions & 1 deletion queue/attempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Attempt struct {
}

func NewAttempt(ctx context.Context, member *model.MemberAttempt, log *wlog.Logger) *Attempt {
return &Attempt{
a := &Attempt{
state: model.MemberStateIdle,
member: member,
Context: ctx,
Expand All @@ -100,6 +100,13 @@ func NewAttempt(ctx context.Context, member *model.MemberAttempt, log *wlog.Logg
wlog.String("name", member.Name),
),
}
if member.MemberId != nil {
a.log = a.log.With(
wlog.Int64("member_id", *member.MemberId),
)
}

return a
}

// Change attempt settings
Expand Down
10 changes: 7 additions & 3 deletions queue/call_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,19 @@ top:
if agentCall.TransferTo() != nil && agentCall.TransferToAgentId() != nil && agentCall.TransferFromAttemptId() != nil {
attempt.Log("receive transfer queue")
if nc, err := queue.GetTransferredCall(*agentCall.TransferTo()); err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
} else {
if nc.HangupAt() == 0 {
if newA, err := queue.queueManager.TransferFrom(team, attempt, *agentCall.TransferFromAttemptId(),
*agentCall.TransferToAgentId(), *agentCall.TransferTo(), nc); err == nil {
agent = newA
attempt.Log(fmt.Sprintf("transfer call from [%s] to [%s] AGENT_ID = %s {%d, %d}", agentCall.Id(), nc.Id(), newA.Name(), attempt.Id(), *agentCall.TransferFromAttemptId()))
} else {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

agentCall = nc
Expand Down Expand Up @@ -155,7 +159,7 @@ top:
}

if agentCall != nil && agentCall.HangupAt() == 0 {
wlog.Warn(fmt.Sprintf("agent call %s no hangup", agentCall.Id()))
attempt.log.Warn(fmt.Sprintf("agent call %s no hangup", agentCall.Id()))
}

if agentCall != nil && agentCall.BridgeAt() > 0 {
Expand Down
28 changes: 20 additions & 8 deletions queue/call_inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
attempt.SetState(model.MemberStateJoined)
attempt.Log("wait agent")
if err = queue.queueManager.SetFindAgentState(attempt.Id()); err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
//todo
return
}
Expand Down Expand Up @@ -98,14 +100,18 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
calling = false
break
} else {
wlog.Debug(fmt.Sprintf("[%d] change call state to %s", attempt.Id(), c))
attempt.log.Debug(fmt.Sprintf("[%d] change call state to %s", attempt.Id(), c),
wlog.String("state", c.String()),
)
}

case <-ags:
agent = attempt.Agent()
team, err = queue.GetTeam(attempt)
if err != nil {
wlog.Error(err.Error()) // todo
attempt.log.Error(err.Error(),
wlog.Err(err),
) // todo
time.Sleep(time.Second * 3)
continue
}
Expand Down Expand Up @@ -143,7 +149,7 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
team.Distribute(queue, agent, NewDistributeEvent(attempt, agent.UserId(), queue, agent, queue.Processing(), mCall, agentCall))
agentCall.Invite()

wlog.Debug(fmt.Sprintf("call [%s] && agent [%s]", mCall.Id(), agentCall.Id()))
attempt.log.Debug(fmt.Sprintf("call [%s] && agent [%s]", mCall.Id(), agentCall.Id()))

top:
for calling && agentCall.HangupCause() == "" && (mCall.HangupCause() == "") {
Expand Down Expand Up @@ -200,15 +206,19 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
if agentCall.TransferTo() != nil && agentCall.TransferToAgentId() != nil && agentCall.TransferFromAttemptId() != nil {
attempt.Log("receive transfer queue")
if nc, err := queue.GetTransferredCall(*agentCall.TransferTo()); err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
} else {
if nc.HangupAt() == 0 {
if newA, err := queue.queueManager.TransferFrom(team, attempt, *agentCall.TransferFromAttemptId(),
*agentCall.TransferToAgentId(), *agentCall.TransferTo(), nc); err == nil {
agent = newA
attempt.Log(fmt.Sprintf("transfer call from [%s] to [%s] AGENT_ID = %s {%d, %d}", agentCall.Id(), nc.Id(), newA.Name(), attempt.Id(), *agentCall.TransferFromAttemptId()))
} else {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

agentCall = nc
Expand Down Expand Up @@ -265,7 +275,7 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
}

if agentCall != nil && agentCall.HangupAt() == 0 {
wlog.Warn(fmt.Sprintf("agent call %s no hangup", agentCall.Id()))
attempt.log.Warn(fmt.Sprintf("agent call %s no hangup", agentCall.Id()))
}

if agentCall != nil && agentCall.BridgeAt() > 0 {
Expand All @@ -277,7 +287,9 @@ func (queue *InboundQueue) run(attempt *Attempt, mCall call_manager.Call) {
if mCall.HangupAt() == 0 && mCall.BridgeAt() == 0 {
err = mCall.StopPlayback()
if err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}
}

Expand Down
13 changes: 10 additions & 3 deletions queue/call_ivr.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,13 @@ func (queue *IVRQueue) run(attempt *Attempt) {

call.Invite()
if call.Err() != nil {
// TODO
return
}

wlog.Debug(fmt.Sprintf("calling %s for member %s attemptId %v", call.Id(), attempt.Name(), attempt.Id()))
attempt.log.Debug(fmt.Sprintf("calling %s for member %s attemptId %v", call.Id(), attempt.Name(), attempt.Id()),
wlog.String("call_id", call.Id()),
)

var calling = true

Expand All @@ -175,7 +178,9 @@ func (queue *IVRQueue) run(attempt *Attempt) {
_, err := queue.queueManager.store.Member().
SetAttemptOffering(attempt.Id(), nil, nil, model.NewString(call.Id()), &dst, &callerIdNumber)
if err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

case call_manager.CALL_STATE_DETECT_AMD, call_manager.CALL_STATE_ACCEPT:
Expand All @@ -188,7 +193,9 @@ func (queue *IVRQueue) run(attempt *Attempt) {
attempt.SetState(model.MemberStateBridged)
_, err := queue.queueManager.store.Member().SetAttemptBridged(attempt.Id())
if err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

case call_manager.CALL_STATE_HANGUP:
Expand Down
3 changes: 1 addition & 2 deletions queue/call_offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/webitel/call_center/agent_manager"
"github.com/webitel/call_center/call_manager"
"github.com/webitel/call_center/model"
"github.com/webitel/wlog"
)

type OfflineQueueSettings struct {
Expand Down Expand Up @@ -188,7 +187,7 @@ func (queue *OfflineCallQueue) run(team *agentTeam, attempt *Attempt, agent agen
}

if call.BillSeconds() > 0 || call.AcceptAt() > 0 { //FIXME Accept or Bridge ?
wlog.Debug(fmt.Sprintf("attempt[%d] reporting...", attempt.Id()))
attempt.log.Debug(fmt.Sprintf("attempt[%d] reporting...", attempt.Id()))
team.Reporting(queue, attempt, agent, call.ReportingAt() > 0, call.Transferred())
} else {
team.Missed(attempt, agent)
Expand Down
8 changes: 6 additions & 2 deletions queue/call_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,18 @@ func (queue *PreviewCallQueue) run(team *agentTeam, attempt *Attempt, agent agen
if call.TransferTo() != nil && call.TransferToAgentId() != nil && call.TransferFromAttemptId() != nil {
attempt.Log("receive transfer")
if nc, err := queue.GetTransferredCall(*call.TransferTo()); err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
} else {
if nc.HangupAt() == 0 {
if newA, err := queue.queueManager.TransferFrom(team, attempt, *call.TransferFromAttemptId(), *call.TransferToAgentId(), *call.TransferTo(), nc); err == nil {
agent = newA
attempt.Log(fmt.Sprintf("transfer call from [%s] to [%s] AGENT_ID = %s {%d, %d}", call.Id(), nc.Id(), newA.Name(), attempt.Id(), *call.TransferFromAttemptId()))
} else {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

call = nc
Expand Down
15 changes: 11 additions & 4 deletions queue/call_progressive.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (queue *ProgressiveCallQueue) run(attempt *Attempt, team *agentTeam, agent
}

printfIfErr(agentCall.Invite())
wlog.Debug(fmt.Sprintf("call [%s] && agent [%s]", mCall.Id(), agentCall.Id()))
attempt.log.Debug(fmt.Sprintf("call [%s] && agent [%s]", mCall.Id(), agentCall.Id()),
wlog.String("call_id", mCall.Id()),
wlog.String("agent_call_id", agentCall.Id()),
)

top:
for agentCall.HangupCause() == "" && mCall.HangupCause() == "" && agentCall.TransferTo() == nil {
Expand Down Expand Up @@ -266,15 +269,19 @@ func (queue *ProgressiveCallQueue) run(attempt *Attempt, team *agentTeam, agent
if agentCall.TransferTo() != nil && agentCall.TransferToAgentId() != nil && agentCall.TransferFromAttemptId() != nil {
attempt.Log("receive transfer")
if nc, err := queue.GetTransferredCall(*agentCall.TransferTo()); err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
} else {
if nc.HangupAt() == 0 {
if newA, err := queue.queueManager.TransferFrom(team, attempt, *agentCall.TransferFromAttemptId(), *agentCall.TransferToAgentId(), *agentCall.TransferTo(), nc); err == nil {
agent = newA
attempt.Log(fmt.Sprintf("transfer call from [%s] to [%s] AGENT_ID = %s {%d, %d}", agentCall.Id(), nc.Id(), newA.Name(), attempt.Id(), *agentCall.TransferFromAttemptId()))
//transferred = true
} else {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
}

agentCall = nc
Expand Down Expand Up @@ -332,7 +339,7 @@ func (queue *ProgressiveCallQueue) run(attempt *Attempt, team *agentTeam, agent
queue.queueManager.LeavingMember(attempt)
} else {
if agentCall.BridgeAt() > 0 { //FIXME Accept or Bridge ?
wlog.Debug(fmt.Sprintf("attempt[%d] reporting...", attempt.Id()))
attempt.log.Debug(fmt.Sprintf("attempt[%d] reporting...", attempt.Id()))
team.Reporting(queue, attempt, agent, agentCall.ReportingAt() > 0, agentCall.Transferred())
} else {
if agentCall.HangupAt() == 0 && agentCall.TransferTo() == nil && mCall.HangupAt() > 0 {
Expand Down
8 changes: 6 additions & 2 deletions queue/chat_inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func (queue *InboundChatQueue) process(attempt *Attempt, inviterId, invUserId st
agent = attempt.Agent()
team, err = queue.GetTeam(attempt)
if err != nil {
wlog.Error(err.Error())
attempt.log.Error(err.Error(),
wlog.Err(err),
)
return
}
attempt.Log(fmt.Sprintf("distribute agent %s [%d]", agent.Name(), agent.Id()))
Expand Down Expand Up @@ -177,7 +179,9 @@ func (queue *InboundChatQueue) process(attempt *Attempt, inviterId, invUserId st
aSess = conv.LastSession()
team.Distribute(queue, agent, NewDistributeEvent(attempt, agent.UserId(), queue, agent, queue.Processing(), mSess, aSess))

wlog.Debug(fmt.Sprintf("conversation [%s] && agent [%s]", conv.MemberSession().Id(), conv.LastSession().Id()))
attempt.log.Debug(fmt.Sprintf("conversation [%s] && agent [%s]", conv.MemberSession().Id(), conv.LastSession().Id()),
wlog.String("conversation_id", conv.MemberSession().Id()),
)

top:
for conv.Active() && aSess.StopAt() == 0 { //
Expand Down
18 changes: 13 additions & 5 deletions queue/dialing.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,25 @@ func (d *DialingImpl) routeIdleAttempts() {
err = d.queueManager.mq.AgentChannelEvent(v.Channel, v.DomainId, 0, v.UserId, waiting)
}
} else {
wlog.Error(err.Error()) ///TODO return ?
d.log.Error(err.Error(),
wlog.Err(err),
) ///TODO return ?
}

members, err := d.store.Member().GetActiveMembersAttempt(d.app.GetInstanceId())
if err != nil {
wlog.Error(err.Error())
d.log.Error(err.Error(),
wlog.Err(err),
)
time.Sleep(time.Second)
return
}

for _, v := range members {
if v.MemberId == nil {
wlog.Warn(fmt.Sprintf("Attempt=%d is canceled", v.Id))
d.log.Warn(fmt.Sprintf("Attempt=%d is canceled", v.Id),
wlog.Int64("attempt_id", v.Id),
)
continue
}
v.CreatedAt = time.Now()
Expand Down Expand Up @@ -130,11 +136,13 @@ func (d *DialingImpl) routeIdleAgents() {
} else {
// TODO
d.queueManager.store.Member().SetTimeoutError(v.AttemptId)
wlog.Error("attempt[%d] error: not found in cache, set timeout error")
d.log.Error("attempt[%d] error: not found in cache, set timeout error")
}
}
} else {
wlog.Error(err.Error()) ///TODO return ?
d.log.Error(err.Error(),
wlog.Err(err),
) ///TODO return ?
}
// FIXME engine
if hists, err := d.store.Member().SaveToHistory(); err == nil {
Expand Down
25 changes: 20 additions & 5 deletions queue/expired.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,29 @@ type ExpiredManager struct {
watcher *utils.Watcher
startOnce sync.Once
pool *utils.Pool
log *wlog.Logger
}

type ExpiredJob struct {
app App
model.ExpiredMember
log *wlog.Logger
}

func NewExpiredManager(app App, store store.Store) *ExpiredManager {
var manager ExpiredManager
manager.app = app
manager.store = store
manager.pool = utils.NewPool(ExpiredWorkers, ExpiredQueue)
manager.log = wlog.GlobalLogger().With(
wlog.Namespace("context"),
wlog.String("name", "expired_manager"),
)
return &manager
}

func (s *ExpiredManager) Start() {
wlog.Debug("starting expired service")
s.log.Debug("starting expired service")
s.watcher = utils.MakeWatcher("Expired", ExpiredPollingInterval, s.job)
s.startOnce.Do(func() {
go s.watcher.Start()
Expand All @@ -54,14 +60,20 @@ func (s *ExpiredManager) Stop() {
func (s *ExpiredManager) job() {
st := time.Now()
if hooks, err := s.store.Member().SetExpired(ExpiredLimit); err != nil {
wlog.Error(err.Error())
s.log.Error(err.Error(),
wlog.Err(err),
)
} else {
wlog.Debug(fmt.Sprintf("set expired members time: %s, hook count %d", time.Now().Sub(st), len(hooks)))
s.log.Debug(fmt.Sprintf("set expired members time: %s, hook count %d", time.Now().Sub(st), len(hooks)))

for _, v := range hooks {
s.pool.Exec(&ExpiredJob{
app: s.app,
ExpiredMember: *v,
log: s.log.With(
wlog.Int64("member_id", v.MemberId),
wlog.Any("schema_id", v.SchemaId),
),
})
}

Expand All @@ -80,8 +92,11 @@ func (v *ExpiredJob) Execute() {
}

if id, err := v.app.FlowManager().Queue().StartFlow(req); err != nil {
wlog.Error(fmt.Sprintf("hook \"leaving\" expired (time %s) member_id=%d, error: %s", time.Now().Sub(st), v.MemberId, err.Error()))
v.log.Error(fmt.Sprintf("hook \"leaving\" expired (time %s) member_id=%d, error: %s", time.Now().Sub(st),
v.MemberId, err.Error()),
wlog.Err(err),
)
} else {
wlog.Debug(fmt.Sprintf("hook \"leaving\" expired (time %s) member_id=%d, job_id: %s", time.Now().Sub(st), v.MemberId, id))
v.log.Debug(fmt.Sprintf("hook \"leaving\" expired (time %s) member_id=%d, job_id: %s", time.Now().Sub(st), v.MemberId, id))
}
}
Loading

0 comments on commit b3b4e68

Please sign in to comment.