Skip to content

Commit

Permalink
Merge pull request #100 from nyaruka/use-std-errors
Browse files Browse the repository at this point in the history
Use std lib errors
  • Loading branch information
rowanseymour authored Jun 3, 2024
2 parents 9fca4ed + 6f98234 commit 0c9d0a2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 69 deletions.
79 changes: 39 additions & 40 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/pkg/errors"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -104,7 +103,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error

rows, err := db.QueryxContext(ctx, sqlLookupActiveOrgs)
if err != nil {
return nil, errors.Wrapf(err, "error fetching active orgs")
return nil, fmt.Errorf("error fetching active orgs: %w", err)
}
defer rows.Close()

Expand All @@ -113,7 +112,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error
org := Org{RetentionPeriod: conf.RetentionPeriod}
err = rows.StructScan(&org)
if err != nil {
return nil, errors.Wrapf(err, "error scanning active org")
return nil, fmt.Errorf("error scanning active org: %w", err)
}
orgs = append(orgs, org)
}
Expand All @@ -135,7 +134,7 @@ func GetCurrentArchives(ctx context.Context, db *sqlx.DB, org Org, archiveType A
archives := make([]*Archive, 0, 1)
err := db.SelectContext(ctx, &archives, sqlLookupOrgArchives, org.ID, archiveType)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting current archives for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error selecting current archives for org: %d and type: %s: %w", org.ID, archiveType, err)
}

return archives, nil
Expand All @@ -155,7 +154,7 @@ func GetArchivesNeedingDeletion(ctx context.Context, db *sqlx.DB, org Org, archi
archives := make([]*Archive, 0, 1)
err := db.SelectContext(ctx, &archives, sqlLookupArchivesNeedingDeletion, org.ID, archiveType)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting archives needing deletion for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error selecting archives needing deletion for org: %d and type: %s: %w", org.ID, archiveType, err)
}

return archives, nil
Expand All @@ -175,7 +174,7 @@ func GetCurrentArchiveCount(ctx context.Context, db *sqlx.DB, org Org, archiveTy

err := db.GetContext(ctx, &archiveCount, sqlCountOrgArchives, org.ID, archiveType)
if err != nil {
return 0, errors.Wrapf(err, "error querying archive count for org: %d and type: %s", org.ID, archiveType)
return 0, fmt.Errorf("error querying archive count for org: %d and type: %s: %w", org.ID, archiveType, err)
}

return archiveCount, nil
Expand All @@ -197,7 +196,7 @@ func GetDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, org Org, arc

err := db.SelectContext(ctx, &existingArchives, sqlLookupOrgDailyArchivesForDateRange, org.ID, archiveType, DayPeriod, startDate, endDate)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting daily archives for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error selecting daily archives for org: %d and type: %s: %w", org.ID, archiveType, err)
}

return existingArchives, nil
Expand Down Expand Up @@ -241,7 +240,7 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start

rows, err := db.QueryxContext(ctx, sqlLookupMissingDailyArchive, startDate, endDate, org.ID, DayPeriod, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing daily archives for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error getting missing daily archives for org: %d and type: %s: %w", org.ID, archiveType, err)
}
defer rows.Close()

Expand All @@ -250,7 +249,7 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start

err = rows.Scan(&missingDay)
if err != nil {
return nil, errors.Wrapf(err, "error scanning missing daily archive for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error scanning missing daily archive for org: %d and type: %s: %w", org.ID, archiveType, err)
}
archive := Archive{
Org: org,
Expand Down Expand Up @@ -295,7 +294,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,

rows, err := db.QueryxContext(ctx, sqlLookupMissingMonthlyArchive, startDate, endDate, org.ID, MonthPeriod, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing monthly archive for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error getting missing monthly archive for org: %d and type: %s: %w", org.ID, archiveType, err)
}
defer rows.Close()

Expand All @@ -304,7 +303,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,

err = rows.Scan(&missingMonth)
if err != nil {
return nil, errors.Wrapf(err, "error scanning missing monthly archive for org: %d and type: %s", org.ID, archiveType)
return nil, fmt.Errorf("error scanning missing monthly archive for org: %d and type: %s: %w", org.ID, archiveType, err)
}
archive := Archive{
Org: org,
Expand Down Expand Up @@ -349,7 +348,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
filename := fmt.Sprintf("%s_%d_%s_%d_%02d_", monthlyArchive.ArchiveType, monthlyArchive.Org.ID, monthlyArchive.Period, monthlyArchive.StartDate.Year(), monthlyArchive.StartDate.Month())
file, err := os.CreateTemp(conf.TempDir, filename)
if err != nil {
return errors.Wrapf(err, "error creating temp file: %s", filename)
return fmt.Errorf("error creating temp file: %s: %w", filename, err)
}
writerHash := md5.New()
gzWriter := gzip.NewWriter(io.MultiWriter(file, writerHash))
Expand Down Expand Up @@ -378,21 +377,21 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client

reader, err := GetS3File(ctx, s3Client, daily.URL)
if err != nil {
return errors.Wrapf(err, "error reading S3 URL: %s", daily.URL)
return fmt.Errorf("error reading S3 URL: %s: %w", daily.URL, err)
}

// set up our reader to calculate our hash along the way
readerHash := md5.New()
teeReader := io.TeeReader(reader, readerHash)
gzipReader, err := gzip.NewReader(teeReader)
if err != nil {
return errors.Wrapf(err, "error creating gzip reader")
return fmt.Errorf("error creating gzip reader: %w", err)
}

// copy this daily file (uncompressed) to our new monthly file
_, err = io.Copy(writer, gzipReader)
if err != nil {
return errors.Wrapf(err, "error copying from s3 to disk for URL: %s", daily.URL)
return fmt.Errorf("error copying from s3 to disk for URL: %s: %w", daily.URL, err)
}

reader.Close()
Expand Down Expand Up @@ -422,7 +421,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
monthlyArchive.Hash = hex.EncodeToString(writerHash.Sum(nil))
stat, err := file.Stat()
if err != nil {
return errors.Wrapf(err, "error statting file: %s", file.Name())
return fmt.Errorf("error statting file: %s: %w", file.Name(), err)
}
monthlyArchive.Size = stat.Size()
monthlyArchive.RecordCount = recordCount
Expand All @@ -444,7 +443,7 @@ func EnsureTempArchiveDirectory(path string) error {
if os.IsNotExist(err) {
return os.MkdirAll(path, 0700)
} else if err != nil {
return errors.Wrapf(err, "error statting temp dir: %s", path)
return fmt.Errorf("error statting temp dir: %s: %w", path, err)
}

// is path a directory
Expand Down Expand Up @@ -482,7 +481,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
filename := fmt.Sprintf("%s_%d_%s%d%02d%02d_", archive.ArchiveType, archive.Org.ID, archive.Period, archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day())
file, err := os.CreateTemp(archivePath, filename)
if err != nil {
return errors.Wrapf(err, "error creating temp file: %s", filename)
return fmt.Errorf("error creating temp file: %s: %w", filename, err)
}

defer func() {
Expand Down Expand Up @@ -513,24 +512,24 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
}

if err != nil {
return errors.Wrapf(err, "error writing archive")
return fmt.Errorf("error writing archive: %w", err)
}

err = writer.Flush()
if err != nil {
return errors.Wrapf(err, "error flushing archive file")
return fmt.Errorf("error flushing archive file: %w", err)
}

err = gzWriter.Close()
if err != nil {
return errors.Wrapf(err, "error closing archive gzip writer")
return fmt.Errorf("error closing archive gzip writer: %w", err)
}

// calculate our size and hash
archive.Hash = hex.EncodeToString(hash.Sum(nil))
stat, err := file.Stat()
if err != nil {
return errors.Wrapf(err, "error calculating archive hash")
return fmt.Errorf("error calculating archive hash: %w", err)
}

archive.ArchiveFile = file.Name()
Expand Down Expand Up @@ -571,7 +570,7 @@ func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, a

err := UploadToS3(ctx, s3Client, bucket, archivePath, archive)
if err != nil {
return errors.Wrapf(err, "error uploading archive to S3")
return fmt.Errorf("error uploading archive to S3: %w", err)
}

archive.NeedsDeletion = true
Expand Down Expand Up @@ -603,20 +602,20 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error

tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return errors.Wrapf(err, "error starting transaction")
return fmt.Errorf("error starting transaction: %w", err)
}

rows, err := tx.NamedQuery(sqlInsertArchive, archive)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error inserting archive")
return fmt.Errorf("error inserting archive: %w", err)
}

rows.Next()
err = rows.Scan(&archive.ID)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error reading new archive id")
return fmt.Errorf("error reading new archive id: %w", err)
}
rows.Close()

Expand All @@ -631,12 +630,12 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error
result, err := tx.ExecContext(ctx, `UPDATE archives_archive SET rollup_id = $1 WHERE id = ANY($2)`, archive.ID, pq.Array(childIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error updating rollup ids")
return fmt.Errorf("error updating rollup ids: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error getting number of rollup ids updated")
return fmt.Errorf("error getting number of rollup ids updated: %w", err)
}
if int(affected) != len(childIDs) {
tx.Rollback()
Expand All @@ -647,7 +646,7 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error
err = tx.Commit()
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error committing new archive transaction")
return fmt.Errorf("error committing new archive transaction: %w", err)
}
return nil
}
Expand All @@ -661,7 +660,7 @@ func DeleteArchiveFile(archive *Archive) error {
err := os.Remove(archive.ArchiveFile)

if err != nil {
return errors.Wrapf(err, "error deleting temp archive file: %s", archive.ArchiveFile)
return fmt.Errorf("error deleting temp archive file: %s: %w", archive.ArchiveFile, err)
}

slog.Debug("deleted temporary archive file",
Expand All @@ -679,7 +678,7 @@ func DeleteArchiveFile(archive *Archive) error {
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
if err != nil {
return nil, nil, nil, nil, errors.Wrapf(err, "error getting current archive count")
return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err)
}

var dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed []*Archive
Expand All @@ -688,7 +687,7 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
if archiveCount == 0 {
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing monthly archives")
return nil, nil, nil, nil, fmt.Errorf("error getting missing monthly archives: %w", err)
}

// we first create monthly archives
Expand All @@ -698,7 +697,7 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
// then add in daily archives taking into account the monthly that have been built
daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, errors.Wrapf(err, "error getting missing daily archives")
return nil, nil, nil, nil, fmt.Errorf("error getting missing daily archives: %w", err)
}

// we then create missing daily archives
Expand All @@ -712,7 +711,7 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error {
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
if err != nil {
return errors.Wrap(err, "error writing archive file")
return fmt.Errorf("error writing archive file: %w", err)
}

defer func() {
Expand All @@ -727,13 +726,13 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3
if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
return errors.Wrap(err, "error writing archive to s3")
return fmt.Errorf("error writing archive to s3: %w", err)
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
return errors.Wrap(err, "error writing record to db")
return fmt.Errorf("error writing record to db: %w", err)
}

return nil
Expand Down Expand Up @@ -883,7 +882,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error creating archives")
return nil, nil, nil, nil, nil, fmt.Errorf("error creating archives: %w", err)
}

if len(dailiesCreated) > 0 {
Expand All @@ -894,7 +893,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3

rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, errors.Wrapf(err, "error rolling up archives")
return nil, nil, nil, nil, nil, fmt.Errorf("error rolling up archives: %w", err)
}

monthliesCreated = append(monthliesCreated, rollupsCreated...)
Expand All @@ -906,7 +905,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if err != nil {
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, errors.Wrapf(err, "error deleting archived records")
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err)
}
}

Expand All @@ -923,7 +922,7 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
cancel()

if err != nil {
return errors.Wrap(err, "error getting active orgs")
return fmt.Errorf("error getting active orgs: %w", err)
}

totalRunsRecordsArchived, totalMsgsRecordsArchived := 0, 0
Expand Down
Loading

0 comments on commit 0c9d0a2

Please sign in to comment.