Skip to content

Commit

Permalink
Add runtime.Runtime to hold config, DB and S3
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jul 26, 2024
1 parent 1b82d85 commit 58cc7b8
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 159 deletions.
98 changes: 49 additions & 49 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/s3x"
"github.com/nyaruka/rp-archiver/runtime"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -97,19 +97,19 @@ const sqlLookupActiveOrgs = `
ORDER BY id`

// GetActiveOrgs returns the active organizations sorted by id
func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error) {
func GetActiveOrgs(ctx context.Context, rt *runtime.Runtime) ([]Org, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

rows, err := db.QueryxContext(ctx, sqlLookupActiveOrgs)
rows, err := rt.DB.QueryxContext(ctx, sqlLookupActiveOrgs)
if err != nil {
return nil, fmt.Errorf("error fetching active orgs: %w", err)
}
defer rows.Close()

orgs := make([]Org, 0, 10)
for rows.Next() {
org := Org{RetentionPeriod: conf.RetentionPeriod}
org := Org{RetentionPeriod: rt.Config.RetentionPeriod}
err = rows.StructScan(&org)
if err != nil {
return nil, fmt.Errorf("error scanning active org: %w", err)
Expand Down Expand Up @@ -320,7 +320,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,
}

// BuildRollupArchive builds a monthly archive from the files present on S3
func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client *s3x.Service, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
func BuildRollupArchive(ctx context.Context, rt *runtime.Runtime, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
ctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()

Expand All @@ -335,7 +335,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
}

// grab all the daily archives we need
missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, db, startDate, endDate, org, archiveType)
missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, rt.DB, startDate, endDate, org, archiveType)
if err != nil {
return err
}
Expand All @@ -346,7 +346,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client

// great, we have all the dailies we need, download them
filename := fmt.Sprintf("%s_%d_%s_%d_%02d_", monthlyArchive.ArchiveType, monthlyArchive.Org.ID, monthlyArchive.Period, monthlyArchive.StartDate.Year(), monthlyArchive.StartDate.Month())
file, err := os.CreateTemp(conf.TempDir, filename)
file, err := os.CreateTemp(rt.Config.TempDir, filename)
if err != nil {
return fmt.Errorf("error creating temp file: %s: %w", filename, err)
}
Expand All @@ -357,7 +357,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client

recordCount := 0

dailies, err := GetDailyArchivesForDateRange(ctx, db, org, archiveType, startDate, endDate)
dailies, err := GetDailyArchivesForDateRange(ctx, rt.DB, org, archiveType, startDate, endDate)
if err != nil {
return err
}
Expand All @@ -375,7 +375,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
continue
}

reader, err := GetS3File(ctx, s3Client, daily.URL)
reader, err := GetS3File(ctx, rt.S3, daily.URL)
if err != nil {
return fmt.Errorf("error reading S3 URL: %s: %w", daily.URL, err)
}
Expand Down Expand Up @@ -549,7 +549,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
}

// UploadArchive uploads the passed archive file to S3
func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, archive *Archive) error {
func UploadArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

Expand All @@ -568,7 +568,7 @@ func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, ar
archive.Hash)
}

err := UploadToS3(ctx, s3Client, bucket, archivePath, archive)
err := UploadToS3(ctx, rt.S3, rt.Config.S3Bucket, archivePath, archive)
if err != nil {
return fmt.Errorf("error uploading archive to S3: %w", err)
}
Expand Down Expand Up @@ -675,8 +675,8 @@ func DeleteArchiveFile(archive *Archive) error {
}

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
func CreateOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, rt.DB, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err)
}
Expand All @@ -685,60 +685,60 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s

// no existing archives means this might be a backfill, figure out if there are full months we can build first
if archiveCount == 0 {
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting missing monthly archives: %w", err)
}

// we first create monthly archives
monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives)
monthliesCreated, monthliesFailed = createArchives(ctx, rt, org, archives)
}

// then add in daily archives taking into account the monthly that have been built
daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType)
daily, err := GetMissingDailyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting missing daily archives: %w", err)
}

// we then create missing daily archives
dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily)
dailiesCreated, dailiesFailed = createArchives(ctx, rt, org, daily)

defer ctx.Done()

return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil
}

func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, archive *Archive) error {
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
func createArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error {
err := CreateArchiveFile(ctx, rt.DB, archive, rt.Config.TempDir)
if err != nil {
return fmt.Errorf("error writing archive file: %w", err)
}

defer func() {
if !config.KeepFiles {
if !rt.Config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
slog.Error("error deleting temporary archive file", "error", err)
}
}
}()

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if rt.Config.UploadToS3 {
err = UploadArchive(ctx, rt, archive)
if err != nil {
return fmt.Errorf("error writing archive to s3: %w", err)
}
}

err = WriteArchiveToDB(ctx, db, archive)
err = WriteArchiveToDB(ctx, rt.DB, archive)
if err != nil {
return fmt.Errorf("error writing record to db: %w", err)
}

return nil
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, org Org, archives []*Archive) ([]*Archive, []*Archive) {
func createArchives(ctx context.Context, rt *runtime.Runtime, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := slog.With("org_id", org.ID, "org_name", org.Name)

created := make([]*Archive, 0, len(archives))
Expand All @@ -748,7 +748,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *
log.With("start_date", archive.StartDate, "end_date", archive.endDate(), "period", archive.Period, "archive_type", archive.ArchiveType).Debug("starting archive")
start := dates.Now()

err := createArchive(ctx, db, config, s3Client, archive)
err := createArchive(ctx, rt, archive)
if err != nil {
log.Error("error creating archive", "error", err)
failed = append(failed, archive)
Expand All @@ -762,14 +762,14 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *
}

// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
func RollupOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, err
}
Expand All @@ -782,30 +782,30 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
log := log.With("start_date", archive.StartDate)
start := dates.Now()

err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
err = BuildRollupArchive(ctx, rt, archive, now, org, archiveType)
if err != nil {
log.Error("error building monthly archive", "error", err)
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if rt.Config.UploadToS3 {
err = UploadArchive(ctx, rt, archive)
if err != nil {
log.Error("error writing archive to s3", "error", err)
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
err = WriteArchiveToDB(ctx, rt.DB, archive)
if err != nil {
log.Error("error writing record to db", "error", err)
failed = append(failed, archive)
continue
}

if !config.KeepFiles {
if !rt.Config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
log.Error("error deleting temporary file", "error", err)
Expand All @@ -825,9 +825,9 @@ const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FA
var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, error) {
func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
// get all the archives that haven't yet been deleted
archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType)
archives, err := GetArchivesNeedingDeletion(ctx, rt.DB, org, archiveType)
if err != nil {
return nil, fmt.Errorf("error finding archives needing deletion '%s'", archiveType)
}
Expand All @@ -848,15 +848,15 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config

switch a.ArchiveType {
case MessageType:
err = DeleteArchivedMessages(ctx, config, db, s3Client, a)
err = DeleteArchivedMessages(ctx, rt, a)
if err == nil {
err = DeleteBroadcasts(ctx, now, config, db, org)
err = DeleteBroadcasts(ctx, rt, now, org)
}

case RunType:
err = DeleteArchivedRuns(ctx, config, db, s3Client, a)
err = DeleteArchivedRuns(ctx, rt, a)
if err == nil {
err = DeleteFlowStarts(ctx, now, config, db, org)
err = DeleteFlowStarts(ctx, rt, now, org)
}

default:
Expand All @@ -876,11 +876,11 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := slog.With("org_id", org.ID, "org_name", org.Name)
start := dates.Now()

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, rt, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("error creating archives: %w", err)
}
Expand All @@ -891,7 +891,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
log.Info("completed archival for org", "elapsed", elapsed, "records_per_second", rate)
}

rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, rt, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("error rolling up archives: %w", err)
}
Expand All @@ -902,8 +902,8 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3

// finally delete any archives not yet actually archived
var deleted []*Archive
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if rt.Config.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, rt, now, org, archiveType)
if err != nil {
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err)
}
Expand All @@ -913,12 +913,12 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
func ArchiveActiveOrgs(rt *runtime.Runtime) error {
start := dates.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := GetActiveOrgs(ctx, db, cfg)
orgs, err := GetActiveOrgs(ctx, rt)
cancel()

if err != nil {
Expand All @@ -937,8 +937,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := slog.With("org_id", org.ID, "org_name", org.Name)

if cfg.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType)
if rt.Config.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, MessageType)
if err != nil {
log.Error("error archiving org messages", "error", err, "archive_type", MessageType)
}
Expand All @@ -948,8 +948,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
totalMsgsRollupsCreated += len(monthliesCreated)
totalMsgsRollupsFailed += len(monthliesFailed)
}
if cfg.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType)
if rt.Config.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, RunType)
if err != nil {
log.Error("error archiving org runs", "error", err, "archive_type", RunType)
}
Expand Down
Loading

0 comments on commit 58cc7b8

Please sign in to comment.