Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start replacing logrus with slog #91

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 62 additions & 65 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"encoding/hex"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"
Expand All @@ -19,7 +20,6 @@
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -471,13 +471,13 @@

start := dates.Now()

log := logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"end_date": archive.endDate(),
"period": archive.Period,
})
log := slog.With(
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"end_date", archive.endDate(),
"period", archive.Period,
)

filename := fmt.Sprintf("%s_%d_%s%d%02d%02d_", archive.ArchiveType, archive.Org.ID, archive.Period, archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day())
file, err := os.CreateTemp(archivePath, filename)
Expand All @@ -490,7 +490,7 @@
if archive.ArchiveFile == "" {
err = os.Remove(file.Name())
if err != nil {
log.WithError(err).WithField("filename", file.Name()).Error("error cleaning up archive file")
log.Error("error cleaning up archive file", "error", err, "filename", file.Name())

Check warning on line 493 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L493

Added line #L493 was not covered by tests
}
}
}()
Expand All @@ -500,9 +500,7 @@
writer := bufio.NewWriter(gzWriter)
defer file.Close()

log.WithFields(logrus.Fields{
"filename": file.Name(),
}).Debug("creating new archive file")
log.Debug("creating new archive file", "filename", file.Name())

recordCount := 0
switch archive.ArchiveType {
Expand Down Expand Up @@ -540,13 +538,13 @@
archive.RecordCount = recordCount
archive.BuildTime = int(dates.Since(start) / time.Millisecond)

log.WithFields(logrus.Fields{
"record_count": recordCount,
"filename": file.Name(),
"file_size": archive.Size,
"file_hash": archive.Hash,
"elapsed": dates.Since(start),
}).Debug("completed writing archive file")
log.Debug("completed writing archive file",
"record_count", recordCount,
"filename", file.Name(),
"file_size", archive.Size,
"file_hash", archive.Hash,
"elapsed", dates.Since(start),
)

return nil
}
Expand Down Expand Up @@ -578,16 +576,15 @@

archive.NeedsDeletion = true

logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"period": archive.Period,
"url": archive.URL,
"file_size": archive.Size,
"file_hash": archive.Hash,
}).Debug("completed uploading archive file")

slog.Debug("completed uploading archive file",
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"period", archive.Period,
"url", archive.URL,
"file_size", archive.Size,
"file_hash", archive.Hash,
)
return nil
}

Expand Down Expand Up @@ -667,14 +664,14 @@
return errors.Wrapf(err, "error deleting temp archive file: %s", archive.ArchiveFile)
}

logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"periond": archive.Period,
"db_archive_id": archive.ID,
"filename": archive.ArchiveFile,
}).Debug("deleted temporary archive file")
slog.Debug("deleted temporary archive file",
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"periond", archive.Period,
"db_archive_id", archive.ID,
"filename", archive.ArchiveFile,
)
return nil
}

Expand Down Expand Up @@ -722,7 +719,7 @@
if !config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
logrus.WithError(err).Error("error deleting temporary archive file")
slog.Error("error deleting temporary archive file", "error", err)

Check warning on line 722 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L722

Added line #L722 was not covered by tests
}
}
}()
Expand All @@ -743,21 +740,21 @@
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})
log := slog.With("org_id", org.ID, "org_name", org.Name)

created := make([]*Archive, 0, len(archives))
failed := make([]*Archive, 0, 5)

for _, archive := range archives {
log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Debug("starting archive")
log.With("start_date", archive.StartDate, "end_date", archive.endDate(), "period", archive.Period, "archive_type", archive.ArchiveType).Debug("starting archive")
start := dates.Now()

err := createArchive(ctx, db, config, s3Client, archive)
if err != nil {
log.WithError(err).Error("error creating archive")
log.Error("error creating archive", "error", err)
failed = append(failed, archive)
} else {
log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Debug("archive complete")
log.Debug("archive complete", "id", archive.ID, "record_count", archive.RecordCount, "elapsed", dates.Since(start))
created = append(created, archive)
}
}
Expand All @@ -770,7 +767,7 @@
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType})
log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
Expand All @@ -783,41 +780,41 @@

// build them from rollups
for _, archive := range archives {
log := log.WithFields(logrus.Fields{"start_date": archive.StartDate})
log := log.With("start_date", archive.StartDate)
start := dates.Now()

err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error building monthly archive")
log.Error("error building monthly archive", "error", err)
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
log.WithError(err).Error("error writing archive to s3")
log.Error("error writing archive to s3", "error", err)

Check warning on line 796 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L796

Added line #L796 was not covered by tests
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
log.WithError(err).Error("error writing record to db")
log.Error("error writing record to db", "error", err)

Check warning on line 804 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L804

Added line #L804 was not covered by tests
failed = append(failed, archive)
continue
}

if !config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
log.WithError(err).Error("error deleting temporary file")
log.Error("error deleting temporary file", "error", err)

Check warning on line 812 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L812

Added line #L812 was not covered by tests
continue
}
}

log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Info("rollup created")
log.Info("rollup created", "id", archive.ID, "record_count", archive.RecordCount, "elapsed", dates.Since(start))
created = append(created, archive)
}

Expand All @@ -839,14 +836,14 @@
// for each archive
deleted := make([]*Archive, 0, len(archives))
for _, a := range archives {
log := logrus.WithFields(logrus.Fields{
"archive_id": a.ID,
"org_id": a.OrgID,
"type": a.ArchiveType,
"count": a.RecordCount,
"start": a.StartDate,
"period": a.Period,
})
log := slog.With(
"archive_id", a.ID,
"org_id", a.OrgID,
"type", a.ArchiveType,
"count", a.RecordCount,
"start", a.StartDate,
"period", a.Period,
)

start := dates.Now()

Expand All @@ -868,20 +865,20 @@
}

if err != nil {
log.WithError(err).Error("error deleting archive")
log.Error("error deleting archive", "error", err)
continue
}

deleted = append(deleted, a)
log.WithFields(logrus.Fields{"elapsed": dates.Since(start)}).Info("deleted archive records")
log.Info("deleted archive records", "elapsed", dates.Since(start))
}

return deleted, nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})
log := slog.With("org_id", org.ID, "org_name", org.Name)
start := dates.Now()

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
Expand All @@ -892,7 +889,7 @@
if len(dailiesCreated) > 0 {
elapsed := dates.Since(start)
rate := float32(countRecords(dailiesCreated)) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
log.Info("completed archival for org", "elapsed", elapsed, "records_per_second", rate)
}

rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
Expand Down Expand Up @@ -939,12 +936,12 @@
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org_id", org.ID).WithField("org_name", org.Name)
log := slog.With("org_id", org.ID, "org_name", org.Name)

if cfg.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages")
log.Error("error archiving org messages", "error", err, "archive_type", MessageType)

Check warning on line 944 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L944

Added line #L944 was not covered by tests
}
totalMsgsRecordsArchived += countRecords(dailiesCreated)
totalMsgsArchivesCreated += len(dailiesCreated)
Expand All @@ -955,7 +952,7 @@
if cfg.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType)
if err != nil {
log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs")
log.Error("error archiving org runs", "error", err, "archive_type", RunType)

Check warning on line 955 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L955

Added line #L955 was not covered by tests
}
totalRunsRecordsArchived += countRecords(dailiesCreated)
totalRunsArchivesCreated += len(dailiesCreated)
Expand All @@ -968,7 +965,7 @@
}

timeTaken := dates.Now().Sub(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")
slog.Info("archiving of active orgs complete", "time_taken", timeTaken, "num_orgs", len(orgs))

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.orgs_archived", float64(len(orgs)))
Expand Down
5 changes: 3 additions & 2 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"compress/gzip"
"context"
"io"
"log/slog"
"os"
"testing"
"time"
Expand All @@ -13,7 +14,6 @@ import (
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -26,7 +26,8 @@ func setup(t *testing.T) *sqlx.DB {

_, err = db.Exec(string(testDB))
assert.NoError(t, err)
logrus.SetLevel(logrus.DebugLevel)

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return db
}
Expand Down
Loading
Loading