Skip to content

Commit

Permalink
Merge pull request #81 from webitel/v24.10
Browse files Browse the repository at this point in the history
V24.10
  • Loading branch information
navrotskyj authored Dec 4, 2024
2 parents 8e5386a + b3b4e68 commit a72888f
Show file tree
Hide file tree
Showing 52 changed files with 1,149 additions and 690 deletions.
14 changes: 13 additions & 1 deletion agent_manager/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,35 @@ package agent_manager
import (
"fmt"
"github.com/webitel/call_center/model"
"github.com/webitel/wlog"
"strconv"
"sync"
)

type Agent struct {
info *model.Agent
manager AgentManager
log *wlog.Logger
sync.RWMutex
}

func NewAgent(info *model.Agent, am AgentManager) AgentObject {
func NewAgent(info *model.Agent, am AgentManager, log *wlog.Logger) AgentObject {
return &Agent{
info: info,
manager: am,
log: log.With(
wlog.Int64("user_id", info.GetUserId()),
wlog.Int("agent_id", info.Id),
wlog.Int("team_id", info.TeamId),
wlog.Int64("domain_id", info.DomainId),
),
}
}

func (agent *Agent) Log() *wlog.Logger {
return agent.log
}

func (agent *Agent) DomainId() int64 {
return agent.info.DomainId
}
Expand Down
36 changes: 23 additions & 13 deletions agent_manager/agent_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@ type agentManager struct {
startOnce sync.Once
agentsCache utils.ObjectCache
hookAutoOfflineAgent HookAutoOfflineAgent
log *wlog.Logger
sync.Mutex
}

func NewAgentManager(nodeId string, s store.Store, mq_ mq.MQ) AgentManager {
var am agentManager
am.store = s
am.mq = mq_
am.nodeId = nodeId
am.agentsCache = utils.NewLruWithParams(sizeAgentChane, "Agents", expireAgentCache, "")
func NewAgentManager(nodeId string, s store.Store, mq_ mq.MQ, log *wlog.Logger) AgentManager {
am := agentManager{
store: s,
mq: mq_,
nodeId: nodeId,
agentsCache: utils.NewLruWithParams(sizeAgentChane, "Agents", expireAgentCache, ""),
log: log.With(
wlog.Namespace("context"),
wlog.String("name", "agent_manager"),
),
}

return &am
}
Expand All @@ -49,7 +55,7 @@ func (am *agentManager) SetHookAutoOfflineAgent(hook HookAutoOfflineAgent) {
}

func (am *agentManager) Start() {
wlog.Debug("starting agent service")
am.log.Debug("starting agent service")
am.watcher = utils.MakeWatcher("AgentManager", watcherPollingInterval, am.changeDeadlineState)
am.startOnce.Do(func() {
go am.watcher.Start()
Expand All @@ -76,18 +82,20 @@ func (am *agentManager) GetAgent(id int, updatedAt int64) (AgentObject, *model.A
if a, err := am.store.Agent().Get(id); err != nil {
return nil, err
} else {
agent = NewAgent(a, am)
agent = NewAgent(a, am, am.log)
}

am.agentsCache.AddWithDefaultExpires(id, agent)
wlog.Debug(fmt.Sprintf("add agent to cache %v", agent.Name()))
agent.Log().Debug(fmt.Sprintf("add agent to cache %v", agent.Name()))
return agent, nil
}

func (am *agentManager) SetOnline(agent AgentObject, onDemand bool) (*model.AgentOnlineData, *model.AppError) {
data, err := am.store.Agent().SetOnline(agent.Id(), onDemand)
if err != nil {
wlog.Error(fmt.Sprintf("agent %s[%d] has been changed status to \"%s\" error: %s", agent.Name(), agent.Id(), model.AgentStatusOnline, err.Error()))
agent.Log().Error(
fmt.Sprintf("agent %s[%d] has been changed status to \"%s\" error: %s", agent.Name(), agent.Id(), model.AgentStatusOnline, err.Error()),
)
return nil, err
}

Expand All @@ -101,7 +109,7 @@ func (am *agentManager) SetOnline(agent AgentObject, onDemand bool) (*model.Agen

func (am *agentManager) setAgentStatus(agent AgentObject, status *model.AgentStatus) *model.AppError {
if err := am.store.Agent().SetStatus(agent.Id(), status.Status, status.StatusPayload); err != nil {
wlog.Error(fmt.Sprintf("agent %s[%d] has been changed state to \"%s\" error: %s", agent.Name(), agent.Id(), status.Status, err.Error()))
agent.Log().Error(fmt.Sprintf("agent %s[%d] has been changed state to \"%s\" error: %s", agent.Name(), agent.Id(), status.Status, err.Error()))
return err
}

Expand Down Expand Up @@ -186,7 +194,9 @@ func (am *agentManager) SetBreakOut(agent AgentObject) *model.AppError {
// todo new watcher &
func (am *agentManager) changeDeadlineState() {
if items, err := am.store.Agent().OnlineWithOutActive(MaxAgentOnlineWithOutSocSec); err != nil {
wlog.Error(err.Error())
am.log.Error(err.Error(),
wlog.Err(err),
)
} else {
for _, v := range items {
if a, _ := am.GetAgent(v.Id, v.UpdatedAt); a != nil {
Expand All @@ -205,7 +215,7 @@ func (am *agentManager) changeDeadlineState() {
}
err = am.SetOffline(a, s)
if err != nil {
wlog.Error(err.Error())
a.Log().Error(err.Error())
} else if am.hookAutoOfflineAgent != nil {
am.hookAutoOfflineAgent(a)
}
Expand Down
9 changes: 2 additions & 7 deletions agent_manager/am_agent_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@ package agent_manager
import (
"fmt"
"github.com/webitel/call_center/model"
"github.com/webitel/wlog"
)

func (am *agentManager) notifyChangeAgentState(agent AgentObject, state string) {
//fmt.Println(agent)
}

func NewAgentEventStatus(agent AgentObject, event model.AgentEventStatus) model.Event {
wlog.Info(fmt.Sprintf("agent %s[%d] has been changed status to \"%s\"", agent.Name(), agent.Id(), event.Status))
agent.Log().Info(fmt.Sprintf("agent %s[%d] has been changed status to \"%s\"", agent.Name(), agent.Id(), event.Status))
return model.NewEvent(model.AgentChangedStatusEvent, agent.UserId(), event)
}

func NewAgentEventOnlineStatus(agent AgentObject, info *model.AgentOnlineData, onDemand bool) model.Event {
wlog.Info(fmt.Sprintf("agent %s[%d] has been changed status to \"%s\"", agent.Name(), agent.Id(), model.AgentStatusOnline))
agent.Log().Info(fmt.Sprintf("agent %s[%d] has been changed status to \"%s\"", agent.Name(), agent.Id(), model.AgentStatusOnline))
return model.NewEvent(model.AgentChangedStatusEvent, agent.UserId(), model.AgentEventOnlineStatus{
Channels: info.Channel,
OnDemand: onDemand,
Expand Down
6 changes: 5 additions & 1 deletion agent_manager/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package agent_manager

import "github.com/webitel/call_center/model"
import (
"github.com/webitel/call_center/model"
"github.com/webitel/wlog"
)

type AgentManager interface {
Start()
Expand Down Expand Up @@ -39,4 +42,5 @@ type AgentObject interface {
HasPush() bool
HookData() map[string]string
StoreStatus(s model.AgentStatus)
Log() *wlog.Logger
}
5 changes: 4 additions & 1 deletion app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (app *App) SetAgentBreakOut(agent agent_manager.AgentObject) *model.AppErro
func (app *App) hangupNoAnswerChannels(chs []*model.CallNoAnswer) {
for _, ch := range chs {
if err := app.callManager.HangupById(ch.Id, ch.AppId); err != nil {
wlog.Error(err.Error())
app.Log.Error(err.Error(),
wlog.Err(err),
wlog.String("call_id", ch.Id),
)
}
}
}
Expand Down
80 changes: 65 additions & 15 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/webitel/call_center/agent_manager"
Expand All @@ -18,6 +19,18 @@ import (
"github.com/webitel/flow_manager/client"
"github.com/webitel/wlog"
"sync/atomic"

otelsdk "github.com/webitel/webitel-go-kit/otel/sdk"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

// -------------------- plugin(s) -------------------- //
_ "github.com/webitel/webitel-go-kit/otel/sdk/log/otlp"
_ "github.com/webitel/webitel-go-kit/otel/sdk/log/stdout"
_ "github.com/webitel/webitel-go-kit/otel/sdk/metric/otlp"
_ "github.com/webitel/webitel-go-kit/otel/sdk/metric/stdout"
_ "github.com/webitel/webitel-go-kit/otel/sdk/trace/otlp"
_ "github.com/webitel/webitel-go-kit/otel/sdk/trace/stdout"
)

type App struct {
Expand All @@ -38,10 +51,15 @@ type App struct {
flowManager client.FlowManager
chatManager *chat.ChatManager
triggerManager *trigger.Manager

ctx context.Context
otelShutdownFunc otelsdk.ShutdownFunc
}

func New(options ...string) (outApp *App, outErr error) {
app := &App{}
app := &App{
ctx: context.Background(),
}

defer func() {
if outErr != nil {
Expand All @@ -52,18 +70,46 @@ func New(options ...string) (outApp *App, outErr error) {
if err := app.LoadConfig(app.configFile); err != nil {
return nil, err
}
config := app.Config()
// TODO
app.setServiceId(app.Config().ServiceSettings.NodeId)

app.Log = wlog.NewLogger(&wlog.LoggerConfiguration{
EnableConsole: true,
ConsoleLevel: wlog.LevelDebug,
})
logConfig := &wlog.LoggerConfiguration{
EnableConsole: config.Log.Console,
ConsoleJson: false,
ConsoleLevel: config.Log.Lvl,
}

if config.Log.File != "" {
logConfig.FileLocation = config.Log.File
logConfig.EnableFile = true
logConfig.FileJson = true
logConfig.FileLevel = config.Log.Lvl
}

if config.Log.Otel {
// TODO
var err error
logConfig.EnableExport = true
app.otelShutdownFunc, err = otelsdk.Configure(
app.ctx,
otelsdk.WithResource(resource.NewSchemaless(
semconv.ServiceName(model.ServiceName),
semconv.ServiceVersion(model.CurrentVersion),
semconv.ServiceInstanceID(*app.id),
semconv.ServiceNamespace("webitel"),
)),
)
if err != nil {
return nil, err
}
}
app.Log = wlog.NewLogger(logConfig)

wlog.RedirectStdLog(app.Log)
wlog.InitGlobalLogger(app.Log)

wlog.Info("server is initializing...")
app.Log.Info("server is initializing...")

if app.newStore == nil {
app.newStore = func() store.Store {
Expand All @@ -72,15 +118,15 @@ func New(options ...string) (outApp *App, outErr error) {
}

app.Store = app.newStore()
app.MQ = mq.NewMQ(rabbit.NewRabbitMQ(app.Config().MessageQueueSettings, app.GetInstanceId()))
app.MQ = mq.NewMQ(rabbit.NewRabbitMQ(app.Config().MessageQueueSettings, app.GetInstanceId(), app.Log))

if cl, err := cluster.NewCluster(*app.id, app.Config().DiscoverySettings.Url, app.Store.Cluster()); err != nil {
if cl, err := cluster.NewCluster(*app.id, app.Config().DiscoverySettings.Url, app.Store.Cluster(), app.Log); err != nil {
return nil, err
} else {
app.cluster = cl
}

app.GrpcServer = NewGrpcServer(app.Config().ServerSettings)
app.GrpcServer = NewGrpcServer(app.Config().ServerSettings, app.Log)

if err := app.cluster.Setup(); err != nil {
return nil, errors.Wrapf(err, "unable to initialize cluster")
Expand All @@ -90,14 +136,14 @@ func New(options ...string) (outApp *App, outErr error) {
return nil, err
}

app.callManager = call_manager.NewCallManager(app.GetInstanceId(), app.Cluster().ServiceDiscovery(), app.MQ)
app.callManager = call_manager.NewCallManager(app.GetInstanceId(), app.Cluster().ServiceDiscovery(), app.MQ, app.Log)
app.callManager.Start()

app.engine = engine.NewEngine(app, *app.id, app.Store, app.Config().QueueSettings.EnableOmnichannel,
app.Config().QueueSettings.PollingInterval)
app.Config().QueueSettings.PollingInterval, app.Log)
app.engine.Start()

app.agentManager = agent_manager.NewAgentManager(app.GetInstanceId(), app.Store, app.MQ)
app.agentManager = agent_manager.NewAgentManager(app.GetInstanceId(), app.Store, app.MQ, app.Log)
app.agentManager.SetHookAutoOfflineAgent(app.hookAutoOfflineAgent)
app.agentManager.Start()

Expand All @@ -106,15 +152,15 @@ func New(options ...string) (outApp *App, outErr error) {
return nil, err
}

app.chatManager = chat.NewChatManager(app.Cluster().ServiceDiscovery(), app.MQ)
app.chatManager = chat.NewChatManager(app.Cluster().ServiceDiscovery(), app.MQ, app.Log)
if err := app.chatManager.Start(); err != nil {
return nil, err
}

app.dialing = queue.NewDialing(app, app.MQ, app.callManager, app.agentManager, app.Store, app.Config().QueueSettings.BridgeSleep)
app.dialing.Start()

app.triggerManager = trigger.NewManager(*app.id, app.Store, app.flowManager)
app.triggerManager = trigger.NewManager(*app.id, app.Store, app.flowManager, app.Log)
if err := app.triggerManager.Start(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,7 +190,7 @@ func (app *App) QueueSettings() model.QueueSettings {
}

func (app *App) Shutdown() {
wlog.Info("stopping Server...")
app.Log.Info("stopping Server...")

if app.cluster != nil {
app.cluster.Stop()
Expand Down Expand Up @@ -181,6 +227,10 @@ func (app *App) Shutdown() {
if app.MQ != nil {
app.MQ.Close()
}

if app.otelShutdownFunc != nil {
app.otelShutdownFunc(app.ctx)
}
}

func (a *App) setServiceId(id *string) {
Expand Down
Loading

0 comments on commit a72888f

Please sign in to comment.