From 9d2c763d45e62954f59bbd41177eb37a2e3a09ca Mon Sep 17 00:00:00 2001 From: "i.navrotskyj" Date: Thu, 28 Mar 2024 14:58:55 +0200 Subject: [PATCH] WTEL-4393 --- go.mod | 4 +- grpc_api/client/client.go | 1 + grpc_api/client/member.go | 9 ++++ grpc_api/member.go | 53 +++++++++++++++++++ queue/{queue_agent.go => queue_call_agent.go} | 6 +-- queue/queue_manager.go | 6 ++- 6 files changed, 74 insertions(+), 5 deletions(-) rename queue/{queue_agent.go => queue_call_agent.go} (95%) diff --git a/go.mod b/go.mod index b92c3006..32b7b770 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/streadway/amqp v1.1.0 github.com/webitel/engine v0.0.0-20240327135406-7469d4bcb04b github.com/webitel/flow_manager v0.0.0-20240318151852-e35870a75700 - github.com/webitel/protos/cc v0.0.0-20240327132302-ffcc68b6314f + github.com/webitel/protos/cc v0.0.0-20240328112808-7000c2969bbe github.com/webitel/protos/fs v0.0.0-20240327130525-7501c51c7a8e github.com/webitel/protos/workflow v0.0.0-20240327132302-ffcc68b6314f github.com/webitel/wlog v0.0.0-20220608103744-93b33e61bd28 @@ -57,3 +57,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) + +replace github.com/webitel/flow_manager => ../flow_manager diff --git a/grpc_api/client/client.go b/grpc_api/client/client.go index 7190f205..eaa4cadd 100644 --- a/grpc_api/client/client.go +++ b/grpc_api/client/client.go @@ -33,6 +33,7 @@ type MemberApi interface { JoinCallToQueue(ctx context.Context, in *cc.CallJoinToQueueRequest) (cc.MemberService_CallJoinToQueueClient, error) JoinChatToQueue(ctx context.Context, in *cc.ChatJoinToQueueRequest) (cc.MemberService_ChatJoinToQueueClient, error) CallJoinToAgent(ctx context.Context, in *cc.CallJoinToAgentRequest) (cc.MemberService_CallJoinToAgentClient, error) + TaskJoinToAgent(ctx context.Context, in *cc.TaskJoinToAgentRequest) (cc.MemberService_TaskJoinToAgentClient, error) DirectAgentToMember(domainId int64, memberId int64, communicationId int, agentId int64) (int64, error) CancelAgentDistribute(ctx context.Context, in *cc.CancelAgentDistributeRequest) (*cc.CancelAgentDistributeResponse, error) diff --git a/grpc_api/client/member.go b/grpc_api/client/member.go index eaf84406..48c902e9 100644 --- a/grpc_api/client/member.go +++ b/grpc_api/client/member.go @@ -91,6 +91,15 @@ func (api *memberApi) CallJoinToAgent(ctx context.Context, in *proto.CallJoinToA return cli.member.CallJoinToAgent(ctx, in) } +func (api *memberApi) TaskJoinToAgent(ctx context.Context, in *proto.TaskJoinToAgentRequest) (proto.MemberService_TaskJoinToAgentClient, error) { + cli, err := api.cli.getRandomClient() + if err != nil { + return nil, err + } + + return cli.member.TaskJoinToAgent(ctx, in) +} + func (api *memberApi) CancelAgentDistribute(ctx context.Context, in *proto.CancelAgentDistributeRequest) (*proto.CancelAgentDistributeResponse, error) { cli, err := api.cli.getRandomClient() if err != nil { diff --git a/grpc_api/member.go b/grpc_api/member.go index d3fedc11..5af98eac 100644 --- a/grpc_api/member.go +++ b/grpc_api/member.go @@ -355,6 +355,59 @@ stop: return nil } +func (api *member) TaskJoinToAgent(in *cc.TaskJoinToAgentRequest, out cc.MemberService_TaskJoinToAgentServer) error { + ctx := context.Background() + attempt, err := api.app.Queue().Manager().DistributeTaskToAgent(ctx, in) + if err != nil { + return err + } + + bridged := attempt.On(queue.AttemptHookBridgedAgent) + leaving := attempt.On(queue.AttemptHookLeaving) + + out.Send(&cc.QueueEvent{ + Data: &cc.QueueEvent_Joined{ + Joined: &cc.QueueEvent_JoinedData{ + AttemptId: attempt.Id(), + AppId: "", + }, + }, + }) + + for { + select { + case <-leaving: + out.Send(&cc.QueueEvent{ + Data: &cc.QueueEvent_Leaving{ + Leaving: &cc.QueueEvent_LeavingData{ + Result: attempt.Result(), + }, + }, + }) + goto stop + case _, ok := <-bridged: + if ok { + br := &cc.QueueEvent_BridgedData{ + AgentId: 0, + } + + if attempt.AgentId() != nil { + br.AgentId = int32(*attempt.AgentId()) + } + out.Send(&cc.QueueEvent{ + Data: &cc.QueueEvent_Bridged{ + Bridged: br, + }, + }) + } + } + } + +stop: + + return nil +} + func (api *member) ProcessingFormAction(_ context.Context, in *cc.ProcessingFormActionRequest) (*cc.ProcessingFormActionResponse, error) { err := api.app.Queue().Manager().AttemptProcessingActionForm(in.AttemptId, in.Action, in.Fields) diff --git a/queue/queue_agent.go b/queue/queue_call_agent.go similarity index 95% rename from queue/queue_agent.go rename to queue/queue_call_agent.go index 9788e533..b71a4ecf 100644 --- a/queue/queue_agent.go +++ b/queue/queue_call_agent.go @@ -8,11 +8,11 @@ import ( "time" ) -type JoinAgentQueue struct { +type JoinAgentCallQueue struct { CallingQueue } -func (queue *JoinAgentQueue) DistributeAttempt(attempt *Attempt) *model.AppError { +func (queue *JoinAgentCallQueue) DistributeAttempt(attempt *Attempt) *model.AppError { mCall, ok := queue.CallManager().GetCall(*attempt.member.MemberCallId) if !ok { return NewErrorCallRequired(queue, attempt) @@ -23,7 +23,7 @@ func (queue *JoinAgentQueue) DistributeAttempt(attempt *Attempt) *model.AppError return nil } -func (queue *JoinAgentQueue) run(attempt *Attempt, mCall call_manager.Call) { +func (queue *JoinAgentCallQueue) run(attempt *Attempt, mCall call_manager.Call) { var calling = true var team *agentTeam diff --git a/queue/queue_manager.go b/queue/queue_manager.go index dbe82d17..b5575169 100644 --- a/queue/queue_manager.go +++ b/queue/queue_manager.go @@ -503,7 +503,7 @@ func (queueManager *QueueManager) DistributeCallToAgent(ctx context.Context, in settings.ProcessingRenewalSec = in.Processing.RenewalSec } - var queue = JoinAgentQueue{ + var queue = JoinAgentCallQueue{ CallingQueue: CallingQueue{ BaseQueue: NewBaseQueue(queueManager, queueManager.resourceManager, settings), }, @@ -528,6 +528,10 @@ func (queueManager *QueueManager) DistributeCallToAgent(ctx context.Context, in return attempt, nil } +func (queueManager *QueueManager) DistributeTaskToAgent(ctx context.Context, in *cc.TaskJoinToAgentRequest) (*Attempt, *model.AppError) { + return nil, model.NewAppError("TOFO", "ddd", nil, "", 500) +} + func (queueManager *QueueManager) DistributeChatToQueue(ctx context.Context, in *cc.ChatJoinToQueueRequest) (*Attempt, *model.AppError) { //var member *model.MemberAttempt var bucketId *int32