Skip to content

Commit

Permalink
add resource labels and custom labels for single workspace queries
Browse files Browse the repository at this point in the history
Signed-off-by: Markus Blaschke <[email protected]>
  • Loading branch information
mblaschke committed Jun 19, 2024
1 parent 799c760 commit ddcb51f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 32 deletions.
1 change: 1 addition & 0 deletions config/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type (
ServiceDiscovery struct {
CacheDuration *time.Duration `long:"azure.servicediscovery.cache" env:"AZURE_SERVICEDISCOVERY_CACHE" description:"Duration for caching Azure ServiceDiscovery of workspaces to reduce API calls (time.Duration)" default:"30m"`
}
ResourceTags []string `long:"azure.resource-tag" env:"AZURE_RESOURCE_TAG" env-delim:" " description:"Azure Resource tags (space delimiter)" default:"owner"`
}

Loganalytics struct {
Expand Down
102 changes: 74 additions & 28 deletions loganalytics/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ type (
Client *armclient.ArmClient
}

workspaceList []string
tagManagerConfig *armclient.ResourceTagManager

workspaceList []WorkspaceConfig

request *http.Request
response http.ResponseWriter
Expand All @@ -65,6 +67,12 @@ type (
concurrencyWaitGroup *sizedwaitgroup.SizedWaitGroup
}

WorkspaceConfig struct {
ResourceID string
CustomerID string
Labels map[string]string
}

LogAnalyticsProbeResult struct {
WorkspaceId string
Name string
Expand All @@ -80,7 +88,7 @@ type (
func NewLogAnalyticsProber(logger *zap.SugaredLogger, w http.ResponseWriter, r *http.Request, concurrencyWaitGroup *sizedwaitgroup.SizedWaitGroup) *LogAnalyticsProber {
prober := LogAnalyticsProber{}
prober.logger = logger
prober.workspaceList = []string{}
prober.workspaceList = []WorkspaceConfig{}
prober.request = r
prober.response = w
prober.ctx = context.Background()
Expand Down Expand Up @@ -121,6 +129,13 @@ func (p *LogAnalyticsProber) Init() {
),
)
}

tagManagerConfig, err := p.Azure.Client.TagManager.ParseTagConfig(p.Conf.Azure.ResourceTags)
if err != nil {
p.logger.Fatal(err)
}

p.tagManagerConfig = tagManagerConfig
}

func (p *LogAnalyticsProber) EnableCache(cache *cache.Cache) {
Expand All @@ -135,21 +150,44 @@ func (p *LogAnalyticsProber) GetPrometheusRegistry() *prometheus.Registry {
return p.registry
}

func (p *LogAnalyticsProber) AddWorkspaces(workspaces ...string) {
for _, workspace := range workspaces {

if strings.HasPrefix(workspace, "/subscriptions/") {
workspaceResource, err := p.ServiceDiscovery.GetWorkspace(p.ctx, workspace)
if err != nil {
p.logger.Panic(err)
}
func (p *LogAnalyticsProber) translateWorkspaceIntoConfig(val string) WorkspaceConfig {
workspaceConfig := WorkspaceConfig{
Labels: map[string]string{},
}

workspace = to.String(workspaceResource.Properties.CustomerID)
if strings.HasPrefix(val, "/subscriptions/") {
workspaceResource, err := p.ServiceDiscovery.GetWorkspace(p.ctx, val)
if err != nil {
p.logger.Panic(err)
}

p.workspaceList = append(p.workspaceList, workspace)
workspaceConfig.ResourceID = to.String(workspaceResource.ID)
workspaceConfig.CustomerID = to.String(workspaceResource.Properties.CustomerID)

if resourceInfo, err := armclient.ParseResourceId(workspaceConfig.ResourceID); err == nil {
workspaceConfig.Labels["resourceID"] = workspaceConfig.ResourceID
workspaceConfig.Labels["resourceGroup"] = resourceInfo.ResourceGroup
workspaceConfig.Labels["resourceName"] = resourceInfo.ResourceName

// add custom labels
workspaceConfig.Labels = p.tagManagerConfig.AddResourceTagsToPrometheusLabels(
p.ctx,
workspaceConfig.Labels,
workspaceConfig.ResourceID,
)
}
} else {
// no resource id, must be a customer id
workspaceConfig.CustomerID = val
}

return workspaceConfig
}

func (p *LogAnalyticsProber) AddWorkspaces(workspaces ...string) {
for _, item := range workspaces {
p.workspaceList = append(p.workspaceList, p.translateWorkspaceIntoConfig(item))
}
}

func (p *LogAnalyticsProber) Run() {
Expand Down Expand Up @@ -239,7 +277,10 @@ func (p *LogAnalyticsProber) executeQueries() error {

workspaceList := p.workspaceList
if queryRow.Workspaces != nil && len(*queryRow.Workspaces) >= 1 {
workspaceList = *queryRow.Workspaces
workspaceList = []WorkspaceConfig{}
for _, workspace := range *queryRow.Workspaces {
workspaceList = append(workspaceList, p.translateWorkspaceIntoConfig(workspace))
}
}

if len(workspaceList) == 0 {
Expand Down Expand Up @@ -279,9 +320,9 @@ func (p *LogAnalyticsProber) executeQueries() error {
}()
case "", "single":
for _, row := range workspaceList {
workspaceId := row
workspaceConfig := row
// Run the query and get the results
prometheusQueryRequests.With(prometheus.Labels{"workspaceID": workspaceId, "module": p.config.moduleName, "metric": queryConfig.Metric}).Inc()
prometheusQueryRequests.With(prometheus.Labels{"workspaceID": workspaceConfig.CustomerID, "module": p.config.moduleName, "metric": queryConfig.Metric}).Inc()

wgProbes.Add(1)
p.concurrencyWaitGroup.Add()
Expand All @@ -290,7 +331,7 @@ func (p *LogAnalyticsProber) executeQueries() error {
defer p.concurrencyWaitGroup.Done()
p.sendQueryToSingleWorkspace(
contextLogger,
workspaceId,
workspaceConfig,
queryConfig,
resultChannel,
)
Expand Down Expand Up @@ -344,7 +385,7 @@ func (p *LogAnalyticsProber) executeQueries() error {
return nil
}

func (p *LogAnalyticsProber) queryWorkspace(workspaces []string, queryConfig kusto.ConfigQuery) (azquery.LogsClientQueryWorkspaceResponse, error) {
func (p *LogAnalyticsProber) queryWorkspace(workspaces []WorkspaceConfig, queryConfig kusto.ConfigQuery) (azquery.LogsClientQueryWorkspaceResponse, error) {
clientOpts := azquery.LogsClientOptions{ClientOptions: *p.Azure.Client.NewAzCoreClientOptions()}
logsClient, err := azquery.NewLogsClient(p.Azure.Client.GetCred(), &clientOpts)
if err != nil {
Expand All @@ -359,8 +400,8 @@ func (p *LogAnalyticsProber) queryWorkspace(workspaces []string, queryConfig kus

additionalWorkspaces := []*string{}
if len(workspaces) > 1 {
for _, workspaceId := range workspaces[1:] {
additionalWorkspaces = append(additionalWorkspaces, to.StringPtr(workspaceId))
for _, workspaceConfig := range workspaces[1:] {
additionalWorkspaces = append(additionalWorkspaces, to.StringPtr(workspaceConfig.CustomerID))
}
}

Expand All @@ -371,13 +412,13 @@ func (p *LogAnalyticsProber) queryWorkspace(workspaces []string, queryConfig kus
AdditionalWorkspaces: additionalWorkspaces,
}

return logsClient.QueryWorkspace(p.ctx, workspaces[0], queryBody, &opts)
return logsClient.QueryWorkspace(p.ctx, workspaces[0].CustomerID, queryBody, &opts)
}

func (p *LogAnalyticsProber) sendQueryToMultipleWorkspace(logger *zap.SugaredLogger, workspaces []string, queryConfig kusto.ConfigQuery, result chan<- LogAnalyticsProbeResult) {
func (p *LogAnalyticsProber) sendQueryToMultipleWorkspace(logger *zap.SugaredLogger, workspaces []WorkspaceConfig, queryConfig kusto.ConfigQuery, result chan<- LogAnalyticsProbeResult) {
workspaceLogger := logger.With(zap.Any("workspaceId", workspaces))

workspaceLogger.With(zap.String("query", queryConfig.Query)).Debug("send query to loganaltyics workspaces")
workspaceLogger.With(zap.String("query", queryConfig.Query)).Debug("send query to logAnalytics workspaces")

queryResults, queryErr := p.queryWorkspace(workspaces, queryConfig)
if queryErr != nil {
Expand Down Expand Up @@ -424,12 +465,12 @@ func (p *LogAnalyticsProber) sendQueryToMultipleWorkspace(logger *zap.SugaredLog
logger.Debug("metrics parsed")
}

func (p *LogAnalyticsProber) sendQueryToSingleWorkspace(logger *zap.SugaredLogger, workspaceId string, queryConfig kusto.ConfigQuery, result chan<- LogAnalyticsProbeResult) {
workspaceLogger := logger.With(zap.String("workspaceId", workspaceId))
func (p *LogAnalyticsProber) sendQueryToSingleWorkspace(logger *zap.SugaredLogger, workspaceConfig WorkspaceConfig, queryConfig kusto.ConfigQuery, result chan<- LogAnalyticsProbeResult) {
workspaceLogger := logger.With(zap.String("workspaceId", workspaceConfig.CustomerID))

workspaceLogger.With(zap.String("query", queryConfig.Query)).Debug("send query to loganaltyics workspace")
workspaceLogger.With(zap.String("query", queryConfig.Query)).Debug("send query to logAnalytics workspace")

queryResults, queryErr := p.queryWorkspace([]string{workspaceId}, queryConfig)
queryResults, queryErr := p.queryWorkspace([]WorkspaceConfig{workspaceConfig}, queryConfig)
if queryErr != nil {
workspaceLogger.Error(queryErr.Error())
result <- LogAnalyticsProbeResult{
Expand Down Expand Up @@ -459,11 +500,16 @@ func (p *LogAnalyticsProber) sendQueryToSingleWorkspace(logger *zap.SugaredLogge
// inject workspaceId
for num := range metric {
metric[num].Labels["workspaceTable"] = to.String(table.Name)
metric[num].Labels["workspaceID"] = workspaceId
metric[num].Labels["workspaceID"] = workspaceConfig.CustomerID

// add labels from resource config
for labelName, labelValue := range workspaceConfig.Labels {
metric[num].Labels[labelName] = labelValue
}
}

result <- LogAnalyticsProbeResult{
WorkspaceId: workspaceId,
WorkspaceId: workspaceConfig.CustomerID,
Name: metricName,
Metrics: metric,
}
Expand Down
5 changes: 1 addition & 4 deletions loganalytics/servicediscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ func (sd *LogAnalyticsServiceDiscovery) findWorkspaces(logger *zap.SugaredLogger
}

for _, row := range result {
prober.workspaceList = append(
prober.workspaceList,
row["customerId"].(string),
)
prober.AddWorkspaces(row["id"].(string))
}
}

0 comments on commit ddcb51f

Please sign in to comment.