Skip to content

Commit

Permalink
Added support for structured logging in csaf_aggretator
Browse files Browse the repository at this point in the history
This PR adds structured logging for the aggregator service. Currently, only the text handler is used, but I can extend this to use the JSON handler as well. In this case, probably some code that is shared between the aggregator and the downloader would need to be moved to a common package.

I was also wondering, whether this repo is moving to Go 1.21 at the future, since `slog` was introduced in to the standard lib in 1.21. So currently, this still relies on the `x/exp` package.

Fixes #462
  • Loading branch information
oxisto committed Apr 18, 2024
1 parent d909e9d commit 0e95ed4
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 72 deletions.
21 changes: 17 additions & 4 deletions cmd/csaf_aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"net/http"
"os"
"runtime"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/csaf-poc/csaf_distribution/v3/internal/models"
"github.com/csaf-poc/csaf_distribution/v3/internal/options"
"github.com/csaf-poc/csaf_distribution/v3/util"
"golang.org/x/exp/slog"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -178,9 +178,11 @@ func (p *provider) ageAccept(c *config) func(time.Time) bool {
}

if c.Verbose {
log.Printf(
"Setting up filter to accept advisories within time range %s to %s\n",
r[0].Format(time.RFC3339), r[1].Format(time.RFC3339))
slog.Debug(
"Setting up filter to accept advisories within time range",
"from", r[0].Format(time.RFC3339),
"to", r[1].Format(time.RFC3339),
)
}
return r.Contains
}
Expand Down Expand Up @@ -393,6 +395,17 @@ func (c *config) setDefaults() {
}
}

// prepareLogging sets up the structured logging.
func (cfg *config) prepareLogging() error {
ho := slog.HandlerOptions{
Level: slog.LevelDebug,
}
handler := slog.NewTextHandler(os.Stdout, &ho)
logger := slog.New(handler)
slog.SetDefault(logger)
return nil
}

// compileIgnorePatterns compiles the configured patterns to be ignored.
func (p *provider) compileIgnorePatterns() error {
pm, err := filter.NewPatternMatcher(p.IgnorePattern)
Expand Down
38 changes: 26 additions & 12 deletions cmd/csaf_aggregator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package main
import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"strings"
Expand All @@ -20,6 +19,7 @@ import (

"github.com/csaf-poc/csaf_distribution/v3/csaf"
"github.com/csaf-poc/csaf_distribution/v3/util"
"golang.org/x/exp/slog"
)

type fullJob struct {
Expand All @@ -29,11 +29,13 @@ type fullJob struct {
err error
}

// setupProviderFull fetches the provider-metadate.json for a specific provider.
// setupProviderFull fetches the provider-metadata.json for a specific provider.
func (w *worker) setupProviderFull(provider *provider) error {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)

w.log.Info("Setting up provider",
"provider", slog.GroupValue(
slog.String("name", provider.Name),
slog.String("domain", provider.Domain),
))
w.dir = ""
w.provider = provider

Expand All @@ -55,7 +57,7 @@ func (w *worker) setupProviderFull(provider *provider) error {
"provider-metadata.json has %d validation issues", len(errors))
}

log.Printf("provider-metadata: %s\n", w.loc)
w.log.Info("Using provider-metadata", "url", w.loc)
return nil
}

Expand All @@ -79,7 +81,7 @@ func (w *worker) fullWork(wg *sync.WaitGroup, jobs <-chan *fullJob) {
func (p *processor) full() error {

if p.cfg.runAsMirror() {
log.Println("Running in aggregator mode")
p.log.Info("Running in aggregator mode")

// check if we need to setup a remote validator
if p.cfg.RemoteValidatorOptions != nil {
Expand All @@ -96,16 +98,18 @@ func (p *processor) full() error {
}()
}
} else {
log.Println("Running in lister mode")
p.log.Info("Running in lister mode")
}

queue := make(chan *fullJob)
var wg sync.WaitGroup

log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)

for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)

go w.fullWork(&wg, queue)
}

Expand Down Expand Up @@ -135,12 +139,22 @@ func (p *processor) full() error {
for i := range jobs {
j := &jobs[i]
if j.err != nil {
log.Printf("error: '%s' failed: %v\n", j.provider.Name, j.err)
p.log.Error("Job execution failed",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
"err", j.err,
)
continue
}
if j.aggregatorProvider == nil {
log.Printf(
"error: '%s' does not produce any result.\n", j.provider.Name)
p.log.Error("Job did not produce any result",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
)
continue
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/csaf_aggregator/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"bufio"
"encoding/csv"
"fmt"
"log"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -377,7 +376,7 @@ func (w *worker) writeIndices() error {
}

for label, summaries := range w.summaries {
log.Printf("%s: %d\n", label, len(summaries))
w.log.Debug("Writing indices", "label", label, "summaries.num", len(summaries))
if err := w.writeInterims(label, summaries); err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions cmd/csaf_aggregator/interim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -102,12 +101,12 @@ func (w *worker) checkInterims(

// XXX: Should we return an error here?
for _, e := range errors {
log.Printf("validation error: %s: %v\n", url, e)
w.log.Error("validation error", "url", url, "err", e)
}

// We need to write the changed content.

// This will start the transcation if not already started.
// This will start the transaction if not already started.
dst, err := tx.Dst()
if err != nil {
return nil, err
Expand Down Expand Up @@ -159,8 +158,7 @@ func (w *worker) checkInterims(

// setupProviderInterim prepares the worker for a specific provider.
func (w *worker) setupProviderInterim(provider *provider) {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)
w.log.Info("Setting up worker", provider.Name, provider.Domain)

w.dir = ""
w.provider = provider
Expand Down Expand Up @@ -262,7 +260,7 @@ func (p *processor) interim() error {
queue := make(chan *interimJob)
var wg sync.WaitGroup

log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)
for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)
Expand Down
5 changes: 3 additions & 2 deletions cmd/csaf_aggregator/lazytransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
package main

import (
"log"
"os"
"path/filepath"

"github.com/csaf-poc/csaf_distribution/v3/util"
"golang.org/x/exp/slog"
)

type lazyTransaction struct {
Expand Down Expand Up @@ -85,7 +85,8 @@ func (lt *lazyTransaction) commit() error {
os.RemoveAll(lt.dst)
return err
}
log.Printf("Move %q -> %q\n", symlink, lt.src)

slog.Debug("Moving directory", "from", symlink, "to", lt.src)
if err := os.Rename(symlink, lt.src); err != nil {
os.RemoveAll(lt.dst)
return err
Expand Down
11 changes: 7 additions & 4 deletions cmd/csaf_aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"path/filepath"

"github.com/csaf-poc/csaf_distribution/v3/internal/options"

"github.com/gofrs/flock"
"golang.org/x/exp/slog"
)

func lock(lockFile *string, fn func() error) error {
Expand Down Expand Up @@ -44,8 +46,9 @@ func lock(lockFile *string, fn func() error) error {

func main() {
_, cfg, err := parseArgsConfig()
options.ErrorCheck(err)
options.ErrorCheck(cfg.prepare())
p := processor{cfg: cfg}
options.ErrorCheck(lock(cfg.LockFile, p.process))
cfg.prepareLogging()
options.ErrorCheckStructured(err)
options.ErrorCheckStructured(cfg.prepare())
p := processor{cfg: cfg, log: slog.Default()}
options.ErrorCheckStructured(lock(cfg.LockFile, p.process))
}
Loading

0 comments on commit 0e95ed4

Please sign in to comment.