diff --git a/model/task.go b/model/task.go new file mode 100644 index 00000000..b05cf23f --- /dev/null +++ b/model/task.go @@ -0,0 +1,11 @@ +package model + +type TaskToAgent struct { + AttemptId int64 `json:"attempt_id" db:"attempt_id"` + Destination []byte `json:"destination" db:"destination"` + Variables map[string]string `json:"variables" db:"variables"` + Name string `json:"name" db:"name"` + TeamId int `json:"team_id" db:"team_id"` + TeamUpdatedAt int64 `json:"team_updated_at" db:"team_updated_at"` + AgentUpdatedAt int64 `json:"agent_updated_at" db:"agent_updated_at"` +} diff --git a/queue/agent_call.go b/queue/agent_call.go deleted file mode 100644 index 2e12db3a..00000000 --- a/queue/agent_call.go +++ /dev/null @@ -1,99 +0,0 @@ -package queue - -import ( - "fmt" - "github.com/webitel/call_center/agent_manager" - "github.com/webitel/call_center/call_manager" - "github.com/webitel/call_center/model" -) - -func (queue *CallingQueue) AgentCallRequest(agent agent_manager.AgentObject, at *agentTeam, attempt *Attempt, apps []*model.CallRequestApplication) *model.CallRequest { - cr := &model.CallRequest{ - Endpoints: agent.GetCallEndpoints(), - Strategy: model.CALL_STRATEGY_DEFAULT, - Destination: attempt.Destination(), - Variables: model.UnionStringMaps( - queue.Variables(), - attempt.ExportVariables(), - agent.Variables(), - map[string]string{ - //"ignore_early_media": "true", - //"absolute_codec_string": "opus,pcmu,pcma", - //"sip_h_X-Webitel-Display-Direction": "inbound", - //"bypass_media_resume_on_hold": "true", - "hangup_after_bridge": "true", - "ignore_display_updates": "true", - "cc_reporting": fmt.Sprintf("%v", queue.Processing()), - model.CallVariableDomainId: fmt.Sprintf("%v", queue.DomainId()), - model.CallVariableUserId: fmt.Sprintf("%v", agent.UserId()), - "bridge_export_vars": "cc_agent_id", - "sip_h_X-Webitel-Direction": "internal", - "wbt_destination": attempt.Destination(), - "wbt_to_id": fmt.Sprintf("%v", agent.Id()), - "wbt_to_number": agent.CallNumber(), - "wbt_to_name": agent.Name(), - "wbt_to_type": "user", //todo agent ? - - "wbt_from_name": attempt.Name(), - "wbt_from_type": "member", - "wbt_from_number": attempt.Destination(), - - //"effective_caller_id_name": attempt.Name(), - //"effective_caller_id_number": attempt.Destination(), - // - //"origination_callee_id_name": attempt.Name(), - //"origination_callee_id_number": attempt.Destination(), - "origination_caller_id_name": attempt.Name(), - "origination_caller_id_number": attempt.Destination(), - - model.QUEUE_AGENT_ID_FIELD: fmt.Sprintf("%d", agent.Id()), - model.QUEUE_TEAM_ID_FIELD: fmt.Sprintf("%d", at.Id()), - model.QUEUE_NAME_FIELD: queue.Name(), - model.QUEUE_TYPE_NAME_FIELD: queue.TypeName(), - model.QUEUE_ATTEMPT_ID_FIELD: fmt.Sprintf("%d", attempt.Id()), - }, - ), - Timeout: at.CallTimeout(), - //CallerName: agent.Name(), - //CallerNumber: agent.CallNumber(), - } - - if agent.HasPush() { - cr.SetPush() - } - - queue.SetHoldMusic(cr) - - if queue.id > 0 { - cr.Variables[model.QUEUE_ID_FIELD] = fmt.Sprintf("%d", queue.Id()) - } - - if attempt.MemberId() != nil { - cr.Variables["wbt_from_id"] = fmt.Sprintf("%d", *attempt.MemberId()) - cr.Variables[model.QUEUE_MEMBER_ID_FIELD] = cr.Variables["wbt_from_id"] - } - - if agent.GreetingMedia() != nil { - cr.Applications = append([]*model.CallRequestApplication{ - { - AppName: "playback", - Args: model.RingtoneUri(agent.DomainId(), agent.GreetingMedia().Id, agent.GreetingMedia().Type), - }, - }, apps...) - } else { - cr.Applications = apps - } - - return cr -} - -func (queue *CallingQueue) MissedAgentAttempt(attemptId int64, agentId int, call call_manager.Call) *model.AppError { - missed := &model.MissedAgentAttempt{ - AttemptId: attemptId, - AgentId: agentId, - Cause: call.HangupCause(), - MissedAt: call.HangupAt(), - } - - return queue.queueManager.store.Agent().CreateMissed(missed) -} diff --git a/queue/queue_call.go b/queue/call.go similarity index 60% rename from queue/queue_call.go rename to queue/call.go index 26790b0b..3c8b7fd6 100644 --- a/queue/queue_call.go +++ b/queue/call.go @@ -2,6 +2,7 @@ package queue import ( "fmt" + "github.com/webitel/call_center/agent_manager" "github.com/webitel/call_center/call_manager" "github.com/webitel/call_center/model" ) @@ -39,27 +40,6 @@ func (queue *CallingQueue) SetHoldMusic(callRequest *model.CallRequest) { } } -func IsHuman(call call_manager.Call, amd *model.QueueAmdSettings) bool { - if amd == nil || !amd.Enabled { - return true - } - - if amd.Ai { - aiAmd := call.AiResult() - if aiAmd.Error != "" || aiAmd.Result == "undefined" { - return true // TODO ? - } - for _, v := range amd.PositiveTags { - if v == aiAmd.Result { - return true - } - } - return false - } - - return call.IsHuman() -} - func (queue *CallingQueue) SetAmdCall(callRequest *model.CallRequest, amd *model.QueueAmdSettings, onHuman string) bool { if amd == nil || !amd.Enabled { return false @@ -121,6 +101,86 @@ func (queue *CallingQueue) NewCall(callRequest *model.CallRequest) (call_manager return queue.queueManager.callManager.NewCall(callRequest) } +func (queue *CallingQueue) AgentCallRequest(agent agent_manager.AgentObject, at *agentTeam, attempt *Attempt, apps []*model.CallRequestApplication) *model.CallRequest { + cr := &model.CallRequest{ + Endpoints: agent.GetCallEndpoints(), + Strategy: model.CALL_STRATEGY_DEFAULT, + Destination: attempt.Destination(), + Variables: model.UnionStringMaps( + queue.Variables(), + attempt.ExportVariables(), + agent.Variables(), + map[string]string{ + //"ignore_early_media": "true", + //"absolute_codec_string": "opus,pcmu,pcma", + //"sip_h_X-Webitel-Display-Direction": "inbound", + //"bypass_media_resume_on_hold": "true", + "hangup_after_bridge": "true", + "ignore_display_updates": "true", + "cc_reporting": fmt.Sprintf("%v", queue.Processing()), + model.CallVariableDomainId: fmt.Sprintf("%v", queue.DomainId()), + model.CallVariableUserId: fmt.Sprintf("%v", agent.UserId()), + "bridge_export_vars": "cc_agent_id", + "sip_h_X-Webitel-Direction": "internal", + "wbt_destination": attempt.Destination(), + "wbt_to_id": fmt.Sprintf("%v", agent.Id()), + "wbt_to_number": agent.CallNumber(), + "wbt_to_name": agent.Name(), + "wbt_to_type": "user", //todo agent ? + + "wbt_from_name": attempt.Name(), + "wbt_from_type": "member", + "wbt_from_number": attempt.Destination(), + + //"effective_caller_id_name": attempt.Name(), + //"effective_caller_id_number": attempt.Destination(), + // + //"origination_callee_id_name": attempt.Name(), + //"origination_callee_id_number": attempt.Destination(), + "origination_caller_id_name": attempt.Name(), + "origination_caller_id_number": attempt.Destination(), + + model.QUEUE_AGENT_ID_FIELD: fmt.Sprintf("%d", agent.Id()), + model.QUEUE_TEAM_ID_FIELD: fmt.Sprintf("%d", at.Id()), + model.QUEUE_NAME_FIELD: queue.Name(), + model.QUEUE_TYPE_NAME_FIELD: queue.TypeName(), + model.QUEUE_ATTEMPT_ID_FIELD: fmt.Sprintf("%d", attempt.Id()), + }, + ), + Timeout: at.CallTimeout(), + //CallerName: agent.Name(), + //CallerNumber: agent.CallNumber(), + } + + if agent.HasPush() { + cr.SetPush() + } + + queue.SetHoldMusic(cr) + + if queue.id > 0 { + cr.Variables[model.QUEUE_ID_FIELD] = fmt.Sprintf("%d", queue.Id()) + } + + if attempt.MemberId() != nil { + cr.Variables["wbt_from_id"] = fmt.Sprintf("%d", *attempt.MemberId()) + cr.Variables[model.QUEUE_MEMBER_ID_FIELD] = cr.Variables["wbt_from_id"] + } + + if agent.GreetingMedia() != nil { + cr.Applications = append([]*model.CallRequestApplication{ + { + AppName: "playback", + Args: model.RingtoneUri(agent.DomainId(), agent.GreetingMedia().Id, agent.GreetingMedia().Type), + }, + }, apps...) + } else { + cr.Applications = apps + } + + return cr +} + func (queue *CallingQueue) HangupManyCall(skipId, cause string, ids ...string) { if len(ids) == 1 { @@ -184,3 +244,35 @@ func (queue *CallingQueue) GetTransferredCall(id string) (call_manager.Call, *mo func (queue *CallingQueue) GranteeId() *int { return queue.granteeId } + +func (queue *CallingQueue) MissedAgentAttempt(attemptId int64, agentId int, call call_manager.Call) *model.AppError { + missed := &model.MissedAgentAttempt{ + AttemptId: attemptId, + AgentId: agentId, + Cause: call.HangupCause(), + MissedAt: call.HangupAt(), + } + + return queue.queueManager.store.Agent().CreateMissed(missed) +} + +func IsHuman(call call_manager.Call, amd *model.QueueAmdSettings) bool { + if amd == nil || !amd.Enabled { + return true + } + + if amd.Ai { + aiAmd := call.AiResult() + if aiAmd.Error != "" || aiAmd.Result == "undefined" { + return true // TODO ? + } + for _, v := range amd.PositiveTags { + if v == aiAmd.Result { + return true + } + } + return false + } + + return call.IsHuman() +} diff --git a/queue/queue_call_agent.go b/queue/call_agent.go similarity index 100% rename from queue/queue_call_agent.go rename to queue/call_agent.go diff --git a/queue/queue_call_inbound.go b/queue/call_inbound.go similarity index 100% rename from queue/queue_call_inbound.go rename to queue/call_inbound.go diff --git a/queue/queue_call_ivr.go b/queue/call_ivr.go similarity index 99% rename from queue/queue_call_ivr.go rename to queue/call_ivr.go index 1cb26629..3352cd1e 100644 --- a/queue/queue_call_ivr.go +++ b/queue/call_ivr.go @@ -27,7 +27,7 @@ type IVRQueue struct { QueueIVRSettings } -func QueueIVRSettingsFromBytes(data []byte) QueueIVRSettings { +func IVRSettingsFromBytes(data []byte) QueueIVRSettings { var settings QueueIVRSettings json.Unmarshal(data, &settings) return settings diff --git a/queue/queue_call_offline.go b/queue/call_offline.go similarity index 98% rename from queue/queue_call_offline.go rename to queue/call_offline.go index a490ba67..d8c7a8a8 100644 --- a/queue/queue_call_offline.go +++ b/queue/call_offline.go @@ -25,7 +25,7 @@ type OfflineCallQueue struct { OfflineQueueSettings } -func QueueOfflineSettingsFromBytes(data []byte) OfflineQueueSettings { +func OfflineSettingsFromBytes(data []byte) OfflineQueueSettings { var settings OfflineQueueSettings json.Unmarshal(data, &settings) return settings diff --git a/queue/queue_call_predict.go b/queue/call_predict.go similarity index 100% rename from queue/queue_call_predict.go rename to queue/call_predict.go diff --git a/queue/queue_call_preview.go b/queue/call_preview.go similarity index 100% rename from queue/queue_call_preview.go rename to queue/call_preview.go diff --git a/queue/queue_call_progressive.go b/queue/call_progressive.go similarity index 100% rename from queue/queue_call_progressive.go rename to queue/call_progressive.go diff --git a/queue/queue_chat_inbound.go b/queue/chat_inbound.go similarity index 100% rename from queue/queue_chat_inbound.go rename to queue/chat_inbound.go diff --git a/queue/dialing.go b/queue/dialing.go index 6ebbb03b..0337eb34 100644 --- a/queue/dialing.go +++ b/queue/dialing.go @@ -40,19 +40,19 @@ func NewDialing(app App, m mq.MQ, callManager call_manager.CallManager, agentMan return &dialing } -func (dialing *DialingImpl) Manager() *QueueManager { - return dialing.queueManager +func (d *DialingImpl) Manager() *QueueManager { + return d.queueManager } -func (dialing *DialingImpl) Start() { +func (d *DialingImpl) Start() { wlog.Debug("starting dialing service") - dialing.watcher = utils.MakeWatcher("Dialing", DEFAULT_WATCHER_POLLING_INTERVAL, dialing.routeData) + d.watcher = utils.MakeWatcher("Dialing", DEFAULT_WATCHER_POLLING_INTERVAL, d.routeData) - dialing.startOnce.Do(func() { - go dialing.watcher.Start() - go dialing.queueManager.Start() - go dialing.statisticsManager.Start() - go dialing.expiredManager.Start() + d.startOnce.Do(func() { + go d.watcher.Start() + go d.queueManager.Start() + go d.statisticsManager.Start() + go d.expiredManager.Start() }) } diff --git a/queue/queue_email_inbound.go b/queue/email_inbound.go similarity index 100% rename from queue/queue_email_inbound.go rename to queue/email_inbound.go diff --git a/queue/errors.go b/queue/errors.go index c7cf5beb..576526ee 100644 --- a/queue/errors.go +++ b/queue/errors.go @@ -44,13 +44,3 @@ func NewErrorVariableRequired(queue QueueObject, attempt *Attempt, name string) http.StatusBadRequest, ) } - -func NewErrorCommunicationPatternRequired(queue QueueObject, attempt *Attempt) *model.AppError { - return model.NewAppError( - "Queue", - "queue.distribute.invalid_communication_pattern.error", - map[string]interface{}{"QueueId": queue.Id(), "AttemptId": attempt.Id()}, - "", - http.StatusUnauthorized, - ) -} diff --git a/queue/queue_hook.go b/queue/hook.go similarity index 100% rename from queue/queue_hook.go rename to queue/hook.go diff --git a/queue/queue_manual.go b/queue/manual.go similarity index 100% rename from queue/queue_manual.go rename to queue/manual.go diff --git a/queue/queue.go b/queue/queue.go index c7344fa4..3592814c 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -123,7 +123,7 @@ func NewQueue(queueManager *QueueManager, resourceManager *ResourceManager, sett BaseQueue: base, HoldMusic: settings.HoldMusic, granteeId: settings.GranteeId, - }, QueueOfflineSettingsFromBytes(settings.Payload)), nil + }, OfflineSettingsFromBytes(settings.Payload)), nil case model.QueueTypeInboundCall: inboundSettings := model.QueueInboundSettingsFromBytes(settings.Payload) @@ -138,7 +138,7 @@ func NewQueue(queueManager *QueueManager, resourceManager *ResourceManager, sett BaseQueue: base, HoldMusic: settings.HoldMusic, granteeId: settings.GranteeId, - }, QueueIVRSettingsFromBytes(settings.Payload)), nil + }, IVRSettingsFromBytes(settings.Payload)), nil case model.QueueTypePreviewCall: return NewPreviewCallQueue(CallingQueue{ @@ -165,7 +165,7 @@ func NewQueue(queueManager *QueueManager, resourceManager *ResourceManager, sett return NewInboundChatQueue(base, InboundChatQueueFromBytes(settings.Payload)), nil case model.QueueTypeAgentTask: - return NewTaskAgentQueue(base, TaskAgentSettingsFromBytes(settings.Payload)), nil + return NewTaskInboundQueue(base, TaskInboundSettingsFromBytes(settings.Payload)), nil case model.QueueTypeOutboundTask: return NewTaskOutboundQueue(base, TaskOutboundQueueSettingsFromBytes(settings.Payload)), nil diff --git a/queue/queue_agent_task.go b/queue/queue_agent_task.go deleted file mode 100644 index 2527113e..00000000 --- a/queue/queue_agent_task.go +++ /dev/null @@ -1,199 +0,0 @@ -package queue - -import ( - "encoding/json" - "fmt" - "github.com/webitel/call_center/agent_manager" - "github.com/webitel/call_center/model" - "net/http" - "strconv" - "sync" - "time" -) - -type TaskState uint8 - -const ( - TaskStateIdle TaskState = iota - TaskStateBridged - TaskStateClosed -) - -type TaskAgentQueue struct { - BaseQueue - TaskAgentQueueSettings -} - -type TaskAgentQueueSettings struct { - MaxAttempts uint `json:"max_attempts"` - PerNumbers bool `json:"per_numbers"` - WaitBetweenRetries uint64 `json:"wait_between_retries"` - WaitBetweenRetriesDesc bool `json:"wait_between_retries_desc"` -} - -// todo max working task ? -type TaskChannel struct { - id string - state TaskState - stateC chan TaskState - createdAt int64 - bridgedAt int64 - closedAt int64 - reportingAt int64 - sync.RWMutex -} - -func NewTaskChannel(id string) *TaskChannel { - return &TaskChannel{ - id: id, - createdAt: model.GetMillis(), - state: TaskStateIdle, - stateC: make(chan TaskState), - } -} - -func (t *TaskChannel) Id() string { - return t.id -} - -func (t *TaskChannel) ReportingAt() int64 { - t.RLock() - defer t.RUnlock() - - return t.reportingAt -} - -func (t *TaskChannel) setState(state TaskState) { - t.state = state - t.stateC <- t.state -} - -func (t *TaskChannel) Answered() bool { - t.RLock() - a := t.bridgedAt - t.RUnlock() - return a > 0 -} - -func (t *TaskChannel) SetAnswered() *model.AppError { - t.Lock() - defer t.Unlock() - - if t.bridgedAt != 0 { - return model.NewAppError("TaskChannel", "queue.task.valid.bridged_at", nil, - fmt.Sprintf("task %s is bridged", t.id), http.StatusBadRequest) - } - - t.bridgedAt = model.GetMillis() - t.setState(TaskStateBridged) - return nil -} - -func (t *TaskChannel) SetClosed() *model.AppError { - t.Lock() - defer t.Unlock() - - if t.closedAt != 0 { - return model.NewAppError("TaskChannel", "queue.task.valid.closed_at", nil, - fmt.Sprintf("task %s is closed", t.id), http.StatusBadRequest) - } - - t.closedAt = model.GetMillis() - t.setState(TaskStateClosed) - return nil -} - -func (t *TaskChannel) Reporting() *model.AppError { - t.Lock() - t.reportingAt = model.GetMillis() - t.Unlock() - - if t.closedAt == 0 { - return t.SetClosed() - } - - return nil -} - -func (t *TaskChannel) IsDeclined() bool { - t.RLock() - defer t.RUnlock() - return t.bridgedAt == 0 -} - -func TaskAgentSettingsFromBytes(data []byte) TaskAgentQueueSettings { - var settings TaskAgentQueueSettings - json.Unmarshal(data, &settings) - return settings -} - -func NewTaskAgentQueue(base BaseQueue, settings TaskAgentQueueSettings) QueueObject { - return &TaskAgentQueue{ - BaseQueue: base, - TaskAgentQueueSettings: settings, - } -} - -func (queue *TaskAgentQueue) DistributeAttempt(attempt *Attempt) *model.AppError { - if attempt.agent == nil { - return NewErrorAgentRequired(queue, attempt) - } - - team, err := queue.GetTeam(attempt) - if err != nil { - return err - } - - task := NewTaskChannel(strconv.Itoa(int(attempt.Id()))) - attempt.channelData = task - - attempt.waitBetween = queue.WaitBetweenRetries - attempt.maxAttempts = queue.MaxAttempts - attempt.perNumbers = queue.PerNumbers - - go queue.run(team, attempt, attempt.Agent(), task) - return nil -} - -func (queue *TaskAgentQueue) run(team *agentTeam, attempt *Attempt, agent agent_manager.AgentObject, task *TaskChannel) { - if !queue.queueManager.DoDistributeSchema(&queue.BaseQueue, attempt) { - queue.queueManager.LeavingMember(attempt) - return - } - - timeout := time.NewTimer(time.Second * time.Duration(team.TaskAcceptTimeout())) - process := true - - team.Distribute(queue, agent, NewDistributeEvent(attempt, agent.UserId(), queue, agent, queue.Processing(), nil, task)) - team.Offering(attempt, agent, task, nil) - - for process { - select { - case s := <-task.stateC: - switch s { - case TaskStateBridged: - timeout.Stop() - team.Bridged(attempt, agent) - case TaskStateClosed: - timeout.Stop() - process = false - } - case <-timeout.C: - attempt.Log("timeout") - process = false - break - } - } - - if task.IsDeclined() && task.ReportingAt() == 0 { - team.CancelAgentAttempt(attempt, agent) - queue.queueManager.LeavingMember(attempt) - } else { - team.Reporting(queue, attempt, agent, task.ReportingAt() > 0, false) - } -} - -func (t *TaskChannel) Stats() map[string]string { - // todo - return map[string]string{} -} diff --git a/queue/queue_manager.go b/queue/queue_manager.go index b5575169..df21c083 100644 --- a/queue/queue_manager.go +++ b/queue/queue_manager.go @@ -529,7 +529,95 @@ func (queueManager *QueueManager) DistributeCallToAgent(ctx context.Context, in } func (queueManager *QueueManager) DistributeTaskToAgent(ctx context.Context, in *cc.TaskJoinToAgentRequest) (*Attempt, *model.AppError) { - return nil, model.NewAppError("TOFO", "ddd", nil, "", 500) + var agent agent_manager.AgentObject + res, err := queueManager.store.Member().DistributeTaskToAgent( + queueManager.app.GetInstanceId(), + in.DomainId, + in.GetAgentId(), + []byte(`{"destination":"1232131231"}`), + in.GetVariables(), + in.CancelDistribute, + ) + + if err != nil { + wlog.Error(err.Error()) + return nil, err + } + + if in.CancelDistribute { + err = queueManager.CancelAgentDistribute(in.GetAgentId()) + if err != nil { + wlog.Error(err.Error()) + } + } + + agent, err = queueManager.agentManager.GetAgent(int(in.GetAgentId()), res.AgentUpdatedAt) + if err != nil { + wlog.Error(err.Error()) + return nil, err + } + + attempt, _ := queueManager.CreateAttemptIfNotExists(ctx, &model.MemberAttempt{ + Id: res.AttemptId, + CreatedAt: time.Now(), + Result: nil, + Destination: res.Destination, + AgentId: model.NewInt(int(in.AgentId)), + AgentUpdatedAt: &res.AgentUpdatedAt, + TeamUpdatedAt: model.NewInt64(res.TeamUpdatedAt), + Variables: res.Variables, + Name: res.Name, + }) + + if in.QueueName == "" { + in.QueueName = "Agent" + } + + settings := &model.Queue{ + Id: 0, + DomainId: in.DomainId, + DomainName: "TODO", + Type: model.QueueTypeAgentTask, + Name: in.QueueName, + Strategy: "", + Payload: nil, + TeamId: &res.TeamId, + Processing: false, + ProcessingSec: 30, + ProcessingRenewalSec: 15, + Hooks: nil, + FormSchemaId: model.NewInt(604), + Variables: map[string]string{ + "wbt_auto_answer": "true", + }, + } + if in.Processing != nil && in.Processing.Enabled { + settings.Processing = true + settings.ProcessingSec = in.Processing.Sec + settings.ProcessingRenewalSec = in.Processing.RenewalSec + } + + var queue = TaskAgent{ + BaseQueue: NewBaseQueue(queueManager, queueManager.resourceManager, settings), + } + + attempt.queue = &queue + attempt.agent = agent + attempt.domainId = queue.domainId + attempt.channel = model.QueueChannelTask + + if err = queue.DistributeAttempt(attempt); err != nil { + wlog.Error(err.Error()) + queueManager.Abandoned(attempt) + queueManager.LeavingMember(attempt) + + return nil, err + } else { + wlog.Info(fmt.Sprintf("[%s] join member %s[%v] AttemptId=%d to queue \"%s\" (size %d, waiting %d, active %d)", queue.TypeName(), attempt.Name(), + attempt.MemberId(), attempt.Id(), queue.Name(), attempt.member.QueueCount, attempt.member.QueueWaitingCount, attempt.member.QueueActiveCount)) + } + + return attempt, nil } func (queueManager *QueueManager) DistributeChatToQueue(ctx context.Context, in *cc.ChatJoinToQueueRequest) (*Attempt, *model.AppError) { diff --git a/queue/task_agent.go b/queue/task_agent.go new file mode 100644 index 00000000..a61aa082 --- /dev/null +++ b/queue/task_agent.go @@ -0,0 +1,74 @@ +package queue + +import ( + "github.com/webitel/call_center/agent_manager" + "github.com/webitel/call_center/model" + "strconv" + "time" +) + +const ( + TaskStateIdle TaskState = iota + TaskStateBridged + TaskStateClosed +) + +type TaskAgent struct { + BaseQueue +} + +func (queue *TaskAgent) DistributeAttempt(attempt *Attempt) *model.AppError { + if attempt.agent == nil { + return NewErrorAgentRequired(queue, attempt) + } + + team, err := queue.GetTeam(attempt) + if err != nil { + return err + } + + task := NewTaskChannel(strconv.Itoa(int(attempt.Id()))) + attempt.channelData = task + + go queue.run(team, attempt, attempt.agent, task) + + return nil +} + +func (queue *TaskAgent) run(team *agentTeam, attempt *Attempt, agent agent_manager.AgentObject, task *TaskChannel) { + if !queue.queueManager.DoDistributeSchema(&queue.BaseQueue, attempt) { + queue.queueManager.LeavingMember(attempt) + return + } + + timeout := time.NewTimer(time.Second * time.Duration(team.TaskAcceptTimeout())) + process := true + + team.Distribute(queue, agent, NewDistributeEvent(attempt, agent.UserId(), queue, agent, queue.Processing(), nil, task)) + team.Offering(attempt, agent, task, nil) + + for process { + select { + case s := <-task.stateC: + switch s { + case TaskStateBridged: + timeout.Stop() + team.Bridged(attempt, agent) + case TaskStateClosed: + timeout.Stop() + process = false + } + case <-timeout.C: + attempt.Log("timeout") + process = false + break + } + } + + if task.IsDeclined() && task.ReportingAt() == 0 { + team.CancelAgentAttempt(attempt, agent) + queue.queueManager.LeavingMember(attempt) + } else { + team.Reporting(queue, attempt, agent, task.ReportingAt() > 0, false) + } +} diff --git a/queue/task_channel.go b/queue/task_channel.go new file mode 100644 index 00000000..7254485c --- /dev/null +++ b/queue/task_channel.go @@ -0,0 +1,104 @@ +package queue + +import ( + "fmt" + "github.com/webitel/call_center/model" + "net/http" + "sync" +) + +type TaskState uint8 + +type TaskChannel struct { + id string + state TaskState + stateC chan TaskState + createdAt int64 + bridgedAt int64 + closedAt int64 + reportingAt int64 + sync.RWMutex +} + +func NewTaskChannel(id string) *TaskChannel { + return &TaskChannel{ + id: id, + createdAt: model.GetMillis(), + state: TaskStateIdle, + stateC: make(chan TaskState), + } +} + +func (t *TaskChannel) Id() string { + return t.id +} + +func (t *TaskChannel) ReportingAt() int64 { + t.RLock() + defer t.RUnlock() + + return t.reportingAt +} + +func (t *TaskChannel) setState(state TaskState) { + t.state = state + t.stateC <- t.state +} + +func (t *TaskChannel) Answered() bool { + t.RLock() + a := t.bridgedAt + t.RUnlock() + return a > 0 +} + +func (t *TaskChannel) SetAnswered() *model.AppError { + t.Lock() + defer t.Unlock() + + if t.bridgedAt != 0 { + return model.NewAppError("TaskChannel", "queue.task.valid.bridged_at", nil, + fmt.Sprintf("task %s is bridged", t.id), http.StatusBadRequest) + } + + t.bridgedAt = model.GetMillis() + t.setState(TaskStateBridged) + return nil +} + +func (t *TaskChannel) SetClosed() *model.AppError { + t.Lock() + defer t.Unlock() + + if t.closedAt != 0 { + return model.NewAppError("TaskChannel", "queue.task.valid.closed_at", nil, + fmt.Sprintf("task %s is closed", t.id), http.StatusBadRequest) + } + + t.closedAt = model.GetMillis() + t.setState(TaskStateClosed) + return nil +} + +func (t *TaskChannel) Reporting() *model.AppError { + t.Lock() + t.reportingAt = model.GetMillis() + t.Unlock() + + if t.closedAt == 0 { + return t.SetClosed() + } + + return nil +} + +func (t *TaskChannel) IsDeclined() bool { + t.RLock() + defer t.RUnlock() + return t.bridgedAt == 0 +} + +func (t *TaskChannel) Stats() map[string]string { + // todo + return map[string]string{} +} diff --git a/queue/task_inbound.go b/queue/task_inbound.go new file mode 100644 index 00000000..8025e5e7 --- /dev/null +++ b/queue/task_inbound.go @@ -0,0 +1,93 @@ +package queue + +import ( + "encoding/json" + "github.com/webitel/call_center/agent_manager" + "github.com/webitel/call_center/model" + "strconv" + "time" +) + +type TaskInboundQueue struct { + BaseQueue + TaskInboundQueueSettings +} + +type TaskInboundQueueSettings struct { + MaxAttempts uint `json:"max_attempts"` + PerNumbers bool `json:"per_numbers"` + WaitBetweenRetries uint64 `json:"wait_between_retries"` + WaitBetweenRetriesDesc bool `json:"wait_between_retries_desc"` +} + +func TaskInboundSettingsFromBytes(data []byte) TaskInboundQueueSettings { + var settings TaskInboundQueueSettings + json.Unmarshal(data, &settings) + return settings +} + +func NewTaskInboundQueue(base BaseQueue, settings TaskInboundQueueSettings) QueueObject { + return &TaskInboundQueue{ + BaseQueue: base, + TaskInboundQueueSettings: settings, + } +} + +func (queue *TaskInboundQueue) DistributeAttempt(attempt *Attempt) *model.AppError { + if attempt.agent == nil { + return NewErrorAgentRequired(queue, attempt) + } + + team, err := queue.GetTeam(attempt) + if err != nil { + return err + } + + task := NewTaskChannel(strconv.Itoa(int(attempt.Id()))) + attempt.channelData = task + + attempt.waitBetween = queue.WaitBetweenRetries + attempt.maxAttempts = queue.MaxAttempts + attempt.perNumbers = queue.PerNumbers + + go queue.run(team, attempt, attempt.Agent(), task) + return nil +} + +func (queue *TaskInboundQueue) run(team *agentTeam, attempt *Attempt, agent agent_manager.AgentObject, task *TaskChannel) { + if !queue.queueManager.DoDistributeSchema(&queue.BaseQueue, attempt) { + queue.queueManager.LeavingMember(attempt) + return + } + + timeout := time.NewTimer(time.Second * time.Duration(team.TaskAcceptTimeout())) + process := true + + team.Distribute(queue, agent, NewDistributeEvent(attempt, agent.UserId(), queue, agent, queue.Processing(), nil, task)) + team.Offering(attempt, agent, task, nil) + + for process { + select { + case s := <-task.stateC: + switch s { + case TaskStateBridged: + timeout.Stop() + team.Bridged(attempt, agent) + case TaskStateClosed: + timeout.Stop() + process = false + } + case <-timeout.C: + attempt.Log("timeout") + process = false + break + } + } + + if task.IsDeclined() && task.ReportingAt() == 0 { + team.CancelAgentAttempt(attempt, agent) + queue.queueManager.LeavingMember(attempt) + } else { + team.Reporting(queue, attempt, agent, task.ReportingAt() > 0, false) + } +} diff --git a/queue/queue_outbound_task.go b/queue/task_outbound.go similarity index 100% rename from queue/queue_outbound_task.go rename to queue/task_outbound.go diff --git a/store/sqlstore/member_store.go b/store/sqlstore/member_store.go index d6f56f1f..6a573956 100644 --- a/store/sqlstore/member_store.go +++ b/store/sqlstore/member_store.go @@ -214,6 +214,36 @@ where :Force::bool or not exists(select 1 from call_center.cc_member_attempt a w return att, nil } +func (s SqlMemberStore) DistributeTaskToAgent(node string, domainId int64, agentId int32, dest []byte, vars map[string]string, force bool) (*model.TaskToAgent, *model.AppError) { + var att *model.TaskToAgent + + err := s.GetMaster().SelectOne(&att, `select * +from call_center.cc_distribute_task_to_agent(:Node, :DomainId, :AgentId, :Dest::jsonb, :Variables) +as x ( + attempt_id int8, + destination jsonb, + variables jsonb, + team_id int, + team_updated_at int8, + agent_updated_at int8 +) +where :Force::bool or not exists(select 1 from call_center.cc_member_attempt a where a.agent_id = :AgentId and a.state != 'leaving' for update )`, map[string]interface{}{ + "Node": node, + "DomainId": domainId, + "Dest": dest, + "Variables": model.MapToJson(vars), + "AgentId": agentId, + "Force": force, + }) + + if err != nil { + return nil, model.NewAppError("SqlMemberStore.DistributeTaskToAgent", "store.sql_member.distribute_task_agent.app_error", nil, + fmt.Sprintf("AgentId=%v %s", agentId, err.Error()), http.StatusInternalServerError) + } + + return att, nil +} + func (s SqlMemberStore) DistributeCallToQueueCancel(id int64) *model.AppError { _, err := s.GetMaster().Exec(`update call_center.cc_member_attempt set result = 'cancel', diff --git a/store/store.go b/store/store.go index 3d1b6ab6..8314180a 100644 --- a/store/store.go +++ b/store/store.go @@ -51,6 +51,7 @@ type MemberStore interface { DistributeCallToQueue(node string, queueId int64, callId string, vars map[string]string, bucketId *int32, priority int, stickyAgentId *int) (*model.InboundCallQueue, *model.AppError) DistributeCallToQueueCancel(id int64) *model.AppError DistributeCallToAgent(node string, callId string, vars map[string]string, agentId int32, force bool) (*model.InboundCallAgent, *model.AppError) + DistributeTaskToAgent(node string, domainId int64, agentId int32, dest []byte, vars map[string]string, force bool) (*model.TaskToAgent, *model.AppError) /* Flow control