Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for structured logging in csaf_aggregator #530

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Download the binaries from the most recent release assets on Github.

### Build from sources

- A recent version of **Go** (1.20+) should be installed. [Go installation](https://go.dev/doc/install)
- A recent version of **Go** (1.21+) should be installed. [Go installation](https://go.dev/doc/install)

- Clone the repository `git clone https://github.com/csaf-poc/csaf_distribution.git `

Expand Down
21 changes: 17 additions & 4 deletions cmd/csaf_aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"runtime"
Expand Down Expand Up @@ -178,9 +178,11 @@ func (p *provider) ageAccept(c *config) func(time.Time) bool {
}

if c.Verbose {
log.Printf(
"Setting up filter to accept advisories within time range %s to %s\n",
r[0].Format(time.RFC3339), r[1].Format(time.RFC3339))
slog.Debug(
"Setting up filter to accept advisories within time range",
"from", r[0].Format(time.RFC3339),
"to", r[1].Format(time.RFC3339),
)
}
return r.Contains
}
Expand Down Expand Up @@ -393,6 +395,17 @@ func (c *config) setDefaults() {
}
}

// prepareLogging sets up the structured logging.
func (c *config) prepareLogging() error {
ho := slog.HandlerOptions{
Level: slog.LevelDebug,
}
handler := slog.NewTextHandler(os.Stdout, &ho)
logger := slog.New(handler)
slog.SetDefault(logger)
return nil
}

// compileIgnorePatterns compiles the configured patterns to be ignored.
func (p *provider) compileIgnorePatterns() error {
pm, err := filter.NewPatternMatcher(p.IgnorePattern)
Expand Down
38 changes: 26 additions & 12 deletions cmd/csaf_aggregator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package main
import (
"errors"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
Expand All @@ -29,11 +29,13 @@ type fullJob struct {
err error
}

// setupProviderFull fetches the provider-metadate.json for a specific provider.
// setupProviderFull fetches the provider-metadata.json for a specific provider.
func (w *worker) setupProviderFull(provider *provider) error {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)

w.log.Info("Setting up provider",
"provider", slog.GroupValue(
slog.String("name", provider.Name),
slog.String("domain", provider.Domain),
))
w.dir = ""
w.provider = provider

Expand All @@ -55,7 +57,7 @@ func (w *worker) setupProviderFull(provider *provider) error {
"provider-metadata.json has %d validation issues", len(errors))
}

log.Printf("provider-metadata: %s\n", w.loc)
w.log.Info("Using provider-metadata", "url", w.loc)
return nil
}

Expand All @@ -79,7 +81,7 @@ func (w *worker) fullWork(wg *sync.WaitGroup, jobs <-chan *fullJob) {
func (p *processor) full() error {

if p.cfg.runAsMirror() {
log.Println("Running in aggregator mode")
p.log.Info("Running in aggregator mode")

// check if we need to setup a remote validator
if p.cfg.RemoteValidatorOptions != nil {
Expand All @@ -96,16 +98,18 @@ func (p *processor) full() error {
}()
}
} else {
log.Println("Running in lister mode")
p.log.Info("Running in lister mode")
}

queue := make(chan *fullJob)
var wg sync.WaitGroup

log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)

for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)

go w.fullWork(&wg, queue)
}

Expand Down Expand Up @@ -135,12 +139,22 @@ func (p *processor) full() error {
for i := range jobs {
j := &jobs[i]
if j.err != nil {
log.Printf("error: '%s' failed: %v\n", j.provider.Name, j.err)
p.log.Error("Job execution failed",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
"err", j.err,
)
continue
}
if j.aggregatorProvider == nil {
log.Printf(
"error: '%s' does not produce any result.\n", j.provider.Name)
p.log.Error("Job did not produce any result",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
)
continue
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/csaf_aggregator/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"bufio"
"encoding/csv"
"fmt"
"log"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -377,7 +376,7 @@ func (w *worker) writeIndices() error {
}

for label, summaries := range w.summaries {
log.Printf("%s: %d\n", label, len(summaries))
w.log.Debug("Writing indices", "label", label, "summaries.num", len(summaries))
if err := w.writeInterims(label, summaries); err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions cmd/csaf_aggregator/interim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -102,12 +101,12 @@ func (w *worker) checkInterims(

// XXX: Should we return an error here?
for _, e := range errors {
log.Printf("validation error: %s: %v\n", url, e)
w.log.Error("validation error", "url", url, "err", e)
}

// We need to write the changed content.

// This will start the transcation if not already started.
// This will start the transaction if not already started.
dst, err := tx.Dst()
if err != nil {
return nil, err
Expand Down Expand Up @@ -159,8 +158,7 @@ func (w *worker) checkInterims(

// setupProviderInterim prepares the worker for a specific provider.
func (w *worker) setupProviderInterim(provider *provider) {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)
w.log.Info("Setting up worker", provider.Name, provider.Domain)

w.dir = ""
w.provider = provider
Expand Down Expand Up @@ -262,7 +260,7 @@ func (p *processor) interim() error {
queue := make(chan *interimJob)
var wg sync.WaitGroup

log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)
for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)
Expand Down
5 changes: 3 additions & 2 deletions cmd/csaf_aggregator/lazytransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package main

import (
"log"
"log/slog"
"os"
"path/filepath"

Expand Down Expand Up @@ -85,7 +85,8 @@ func (lt *lazyTransaction) commit() error {
os.RemoveAll(lt.dst)
return err
}
log.Printf("Move %q -> %q\n", symlink, lt.src)

slog.Debug("Moving directory", "from", symlink, "to", lt.src)
if err := os.Rename(symlink, lt.src); err != nil {
os.RemoveAll(lt.dst)
return err
Expand Down
11 changes: 7 additions & 4 deletions cmd/csaf_aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ package main

import (
"fmt"
"log/slog"
"os"
"path/filepath"

"github.com/csaf-poc/csaf_distribution/v3/internal/options"

"github.com/gofrs/flock"
)

Expand Down Expand Up @@ -44,8 +46,9 @@ func lock(lockFile *string, fn func() error) error {

func main() {
_, cfg, err := parseArgsConfig()
options.ErrorCheck(err)
options.ErrorCheck(cfg.prepare())
p := processor{cfg: cfg}
options.ErrorCheck(lock(cfg.LockFile, p.process))
cfg.prepareLogging()
options.ErrorCheckStructured(err)
options.ErrorCheckStructured(cfg.prepare())
p := processor{cfg: cfg, log: slog.Default()}
options.ErrorCheckStructured(lock(cfg.LockFile, p.process))
}
Loading
Loading