Skip to content

Commit

Permalink
[WTEL-4049]: merge
Browse files Browse the repository at this point in the history
  • Loading branch information
Yehor Datsenko committed Nov 27, 2023
2 parents d179540 + 146355e commit 532c2af
Show file tree
Hide file tree
Showing 18 changed files with 1,658 additions and 292 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ nfpms:
bindir: /usr/local/bin

# Version Release.
release: 23.09
release: 23.12

# Section.
section: default
Expand Down
13 changes: 7 additions & 6 deletions app/api_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ type LoggerService struct {
app *App
}

func NewLoggerService(app *App) (*LoggerService, errors.AppError) {
if app == nil {
return nil, errors.NewInternalError("api.config.new_logger_service.args_check.app_nil", "app is nil")
}
return &LoggerService{app: app}, nil
}

func (s *LoggerService) SearchLogByRecordId(ctx context.Context, in *proto.SearchLogByRecordIdRequest) (*proto.Logs, error) {
session, err := s.app.GetSessionFromCtx(ctx)
if err != nil {
Expand All @@ -28,12 +35,6 @@ func (s *LoggerService) SearchLogByRecordId(ctx context.Context, in *proto.Searc
}
return s.app.SearchLogsByRecordId(ctx, in)
}
func NewLoggerService(app *App) (*LoggerService, errors.AppError) {
if app == nil {
return nil, errors.NewInternalError("api.config.new_logger_service.args_check.app_nil", "app is nil")
}
return &LoggerService{app: app}, nil
}

func (s *LoggerService) SearchLogByUserId(ctx context.Context, in *proto.SearchLogByUserIdRequest) (*proto.Logs, error) {
session, err := s.app.GetSessionFromCtx(ctx)
Expand Down
26 changes: 22 additions & 4 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@ import (
"fmt"
"strings"

"github.com/webitel/engine/auth_manager"
"github.com/webitel/engine/discovery"
_ "github.com/mbobakov/grpc-consul-resolver"
"github.com/webitel/logger/model"
"github.com/webitel/logger/pkg/cache"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/webitel/engine/auth_manager"
"github.com/webitel/engine/discovery"
"github.com/webitel/logger/storage"
"github.com/webitel/logger/storage/postgres"
"github.com/webitel/logger/watcher"
strg "github.com/webitel/protos/storage"
"google.golang.org/grpc/metadata"

errors "github.com/webitel/engine/model"
)

const (
DeleteWatcherPrefix = "config.watcher"
DeleteWatcherPrefix = "config.delete.watcher"
UploadWatcherPrefix = "config.upload.watcher"
SESSION_CACHE_SIZE = 35000
SESSION_CACHE_TIME = 60 * 5

Expand All @@ -31,7 +37,9 @@ const (
type App struct {
config *model.AppConfig
storage storage.Storage
watchers map[string]*watcher.Watcher
file strg.FileServiceClient
uploadWatchers map[string]*watcher.UploadWatcher
deleteWatchers map[string]*watcher.Watcher
serviceDiscovery discovery.ServiceDiscovery
sessionManager auth_manager.AuthManager
cache cache.CacheStore
Expand Down Expand Up @@ -81,6 +89,16 @@ func New(config *model.AppConfig) (*App, errors.AppError) {
return nil, appErr
}
app.server = s

conn, err := grpc.Dial(fmt.Sprintf("consul://%s/storage?wait=14s", config.Consul.Address),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, errors.NewInternalError("app.app.new_app.grpc_conn.error", err.Error())
}

app.file = strg.NewFileServiceClient(conn)
return app, nil
}

Expand Down
140 changes: 104 additions & 36 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package app

import (
"context"
"fmt"
"strings"
"time"

"github.com/webitel/logger/watcher"

"github.com/webitel/wlog"

"github.com/webitel/engine/auth_manager"
errors "github.com/webitel/engine/model"
"github.com/webitel/logger/model"
Expand All @@ -13,12 +18,12 @@ import (

func (a *App) UpdateConfig(ctx context.Context, in *proto.UpdateConfigRequest, domainId int, userId int) (*proto.Config, errors.AppError) {
var (
result *model.Config
newConfig *model.Config
)
if in == nil {
return nil, errors.NewInternalError("app.app.update_config.check_arguments.fail", "config proto is nil")
}
oldModel, err := a.storage.Config().GetById(ctx, nil, int(in.GetConfigId()))
oldConfig, err := a.storage.Config().GetById(ctx, nil, int(in.GetConfigId()))
if err != nil {
return nil, err
}
Expand All @@ -28,14 +33,14 @@ func (a *App) UpdateConfig(ctx context.Context, in *proto.UpdateConfigRequest, d
if err != nil {
return nil, err
}
result, err = a.storage.Config().Update(ctx, model, []string{}, userId)
newConfig, err = a.storage.Config().Update(ctx, model, []string{}, userId)
if err != nil {
return nil, err
}

a.UpdateDeleteWatcherForConfig(oldModel.Enabled == true && result.Enabled == false, result.DaysToStore, result.Id, domainId, result.Object.Id.Int())
a.UpdateConfigWatchers(oldConfig, newConfig)

res, err := a.convertConfigModelToMessage(result)
res, err := a.convertConfigModelToMessage(newConfig)
if err != nil {
return nil, err
}
Expand All @@ -45,29 +50,26 @@ func (a *App) UpdateConfig(ctx context.Context, in *proto.UpdateConfigRequest, d

func (a *App) PatchUpdateConfig(ctx context.Context, in *proto.PatchConfigRequest, domainId int, userId int) (*proto.Config, errors.AppError) {
var (
result *model.Config
newConfig *model.Config
)
if in == nil {
return nil, errors.NewInternalError("app.app.update_config.check_arguments.fail", "config proto is nil")
}
oldModel, err := a.storage.Config().GetById(ctx, nil, int(in.GetConfigId()))
oldConfig, err := a.storage.Config().GetById(ctx, nil, int(in.GetConfigId()))
if err != nil {
return nil, err
}

model, err := a.convertPatchConfigMessageToModel(in, domainId)

updatedConfigModel, err := a.convertPatchConfigMessageToModel(in, domainId)
if err != nil {
return nil, err
}
result, err = a.storage.Config().Update(ctx, model, in.GetFields(), userId)
newConfig, err = a.storage.Config().Update(ctx, updatedConfigModel, in.GetFields(), userId)
if err != nil {
return nil, err
}

a.UpdateDeleteWatcherForConfig(oldModel.Enabled == true && result.Enabled == false, result.DaysToStore, result.Id, domainId, result.Object.Id.Int())

res, err := a.convertConfigModelToMessage(result)
a.UpdateConfigWatchers(oldConfig, newConfig)
res, err := a.convertConfigModelToMessage(newConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,17 +101,72 @@ func (a *App) GetSystemObjects(ctx context.Context, in *proto.ReadSystemObjectsR

}

func (a *App) UpdateDeleteWatcherForConfig(statusChangedToDisabled bool, daysToStore, configId, domainId, objectId int) {
watcherName := FormatKey(DeleteWatcherPrefix, domainId, objectId)
if statusChangedToDisabled {
a.DeleteWatcherByKey(watcherName)
} else {
if a.GetWatcherByKey(watcherName) != nil {
a.UpdateDeleteWatcherWithNewInterval(configId, daysToStore)
func (a *App) UpdateConfigWatchers(oldConfig, newConfig *model.Config) {
configId := newConfig.Id
domainId := newConfig.DomainId
if !newConfig.Enabled && oldConfig.Enabled { // changed to disabled
a.DeleteWatchers(configId)
wlog.Info(fmt.Sprintf("config with id %d disabled... watchers have been deleted !", configId))
} else if newConfig.Enabled && oldConfig.Enabled || (newConfig.Enabled && !oldConfig.Enabled) { // status wasn't changed and it's still enabled OR changed to enabled

if newConfig.DaysToStore != oldConfig.DaysToStore { // if days to store changed
if a.GetLogCleaner(configId) != nil { // if upload watcher exists then update it's new days to store
a.UpdateLogCleanerWithNewInterval(configId, newConfig.DaysToStore)
wlog.Info(fmt.Sprintf("config with id %d changed it's log capacity... watcher have been notified and updated !", configId))
} else {
a.InsertLogCleaner(configId, nil, newConfig.DaysToStore)
wlog.Info(fmt.Sprintf("config with id %d changed it's log capacity... new watcher have been created !", configId))
}
}

//if newConfig.Period != oldConfig.Period { // if period changed
// if params := a.GetLogUploaderParams(configId); params != nil {
// params.Period = newConfig.Period
// wlog.Info(fmt.Sprintf("config with id %d changed it's upload period... watcher have been notified and updated !", configId))
// } else {
// a.InsertLogUploader(configId, &watcher.UploadWatcherParams{
// StorageId: newConfig.Storage.Id.Int(),
// Period: newConfig.Period,
// NextUploadOn: newConfig.NextUploadOn.Time(),
// LastLogId: newConfig.LastUploadedLog.Int(),
// DomainId: domainId,
// })
// wlog.Info(fmt.Sprintf("config with id %d changed it's upload period... new watcher have been created !", configId))
// }
//}
//
//if newConfig.Storage.Id.Int() != oldConfig.Storage.Id.Int() { // if storage changed
// if params := a.GetLogUploaderParams(configId); params != nil {
// params.StorageId = newConfig.Storage.Id.Int()
// wlog.Info(fmt.Sprintf("config with id %d changed it's upload period... watcher have been notified and updated !", configId))
// } else {
// a.InsertLogUploader(configId, &watcher.UploadWatcherParams{
// StorageId: newConfig.Storage.Id.Int(),
// Period: newConfig.Period,
// NextUploadOn: newConfig.NextUploadOn.Time(),
// LastLogId: newConfig.LastUploadedLog.Int(),
// DomainId: domainId,
// })
// wlog.Info(fmt.Sprintf("config with id %d changed it's upload period... new watcher have been created !", configId))
// }
//}
if params := a.GetLogUploaderParams(configId); params != nil {
params.UserId = newConfig.UpdatedBy.Int()
params.StorageId = newConfig.Storage.Id.Int()
params.Period = newConfig.Period
params.NextUploadOn = newConfig.NextUploadOn.Time()
} else {
a.InsertNewDeleteWatcher(configId, daysToStore)
a.InsertLogUploader(configId, nil, &watcher.UploadWatcherParams{
StorageId: newConfig.Storage.Id.Int(),
Period: newConfig.Period,
NextUploadOn: newConfig.NextUploadOn.Time(),
LastLogId: newConfig.LastUploadedLog.Int(),
DomainId: domainId,
})
}
wlog.Info(fmt.Sprintf("config with id %d updated... watchers have been updated too !", configId))
}
// else status still disabled
}

func (a *App) InsertConfig(ctx context.Context, in *proto.CreateConfigRequest, domainId int, userId int) (*proto.Config, errors.AppError) {
Expand All @@ -128,15 +185,21 @@ func (a *App) InsertConfig(ctx context.Context, in *proto.CreateConfigRequest, d
return nil, err
}
if newModel.Enabled {
a.InsertNewDeleteWatcher(newModel.Id, newModel.DaysToStore)
a.InsertLogCleaner(newModel.Id, nil, newModel.DaysToStore)
a.InsertLogUploader(newModel.Id, nil, &watcher.UploadWatcherParams{
StorageId: newModel.Storage.Id.Int(),
Period: newModel.Period,
NextUploadOn: newModel.NextUploadOn.Time(),
LastLogId: 0,
DomainId: domainId,
})
}

res, err := a.convertConfigModelToMessage(newModel)
if err != nil {
return nil, err
}
return res, nil

}

func (a *App) GetConfigByObjectId(ctx context.Context /*opt *model.SearchOptions,*/, domainId int, objectId int) (*proto.Config, errors.AppError) {
Expand Down Expand Up @@ -300,10 +363,10 @@ func (a *App) convertUpdateConfigMessageToModel(in *proto.UpdateConfigRequest, d
DaysToStore: int(in.GetDaysToStore()),
Period: int(in.GetPeriod()),
//Storage.Id: int(in.GetStorageId()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
NextUploadOn: *model.NewNullTime(calculateNextPeriodFromNow(in.Period)),
}
a.calculateNextPeriod(config)

if v := in.GetStorage().GetId(); v != 0 {
storageId, err := model.NewNullInt(in.GetStorage().GetId())
Expand All @@ -323,11 +386,10 @@ func (a *App) convertPatchConfigMessageToModel(in *proto.PatchConfigRequest, dom
DaysToStore: int(in.GetDaysToStore()),
Period: int(in.GetPeriod()),
//Storage.Id: int(in.GetStorageId()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
NextUploadOn: *model.NewNullTime(calculateNextPeriodFromNow(in.Period)),
}
a.calculateNextPeriod(config)

if v := in.GetStorage().GetId(); v != 0 {
storageId, err := model.NewNullInt(in.GetStorage().GetId())
if err != nil {
Expand All @@ -350,10 +412,10 @@ func (a *App) convertCreateConfigMessageToModel(in *proto.CreateConfigRequest, d
DaysToStore: int(in.GetDaysToStore()),
Period: int(in.GetPeriod()),
//StorageId: int(in.GetStorageId()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
DomainId: domainId,
Description: *model.NewNullString(in.GetDescription()),
NextUploadOn: *model.NewNullTime(calculateNextPeriodFromNow(in.Period)),
}
a.calculateNextPeriod(config)
objectId, err := model.NewNullInt(in.GetObject().GetId())
if err != nil {
return nil, errors.NewInternalError("app.config.convert_create_config_message.convert_object_id.fail", err.Error())
Expand Down Expand Up @@ -396,6 +458,12 @@ func (a *App) convertConfigModelToMessage(in *model.Config) (*proto.Config, erro
return conf, nil
}

func (a *App) calculateNextPeriod(in *model.Config) {
in.NextUploadOn = *model.NewNullTime(time.Now().Add(time.Hour * 24 * time.Duration(in.Period)))
func calculateNextPeriodFromDate(period int, from time.Time) *model.NullTime {
return model.NewNullTime(from.Add(time.Hour * 24 * time.Duration(period)))

}

func calculateNextPeriodFromNow(period int32) time.Time {
now := time.Now().Add(time.Hour * 24 * time.Duration(period))
return now
}
Loading

0 comments on commit 532c2af

Please sign in to comment.