diff --git a/archives/archives.go b/archives/archives.go index a813342..d46e3d3 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -18,7 +18,7 @@ import ( "github.com/lib/pq" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" - "github.com/nyaruka/gocommon/s3x" + "github.com/nyaruka/rp-archiver/runtime" ) // ArchiveType is the type for the archives @@ -97,11 +97,11 @@ const sqlLookupActiveOrgs = ` ORDER BY id` // GetActiveOrgs returns the active organizations sorted by id -func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error) { +func GetActiveOrgs(ctx context.Context, rt *runtime.Runtime) ([]Org, error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() - rows, err := db.QueryxContext(ctx, sqlLookupActiveOrgs) + rows, err := rt.DB.QueryxContext(ctx, sqlLookupActiveOrgs) if err != nil { return nil, fmt.Errorf("error fetching active orgs: %w", err) } @@ -109,7 +109,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error orgs := make([]Org, 0, 10) for rows.Next() { - org := Org{RetentionPeriod: conf.RetentionPeriod} + org := Org{RetentionPeriod: rt.Config.RetentionPeriod} err = rows.StructScan(&org) if err != nil { return nil, fmt.Errorf("error scanning active org: %w", err) @@ -320,7 +320,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time, } // BuildRollupArchive builds a monthly archive from the files present on S3 -func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client *s3x.Service, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error { +func BuildRollupArchive(ctx context.Context, rt *runtime.Runtime, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error { ctx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() @@ -335,7 +335,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client } // grab all the daily archives we need - missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, db, startDate, endDate, org, archiveType) + missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, rt.DB, startDate, endDate, org, archiveType) if err != nil { return err } @@ -346,7 +346,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client // great, we have all the dailies we need, download them filename := fmt.Sprintf("%s_%d_%s_%d_%02d_", monthlyArchive.ArchiveType, monthlyArchive.Org.ID, monthlyArchive.Period, monthlyArchive.StartDate.Year(), monthlyArchive.StartDate.Month()) - file, err := os.CreateTemp(conf.TempDir, filename) + file, err := os.CreateTemp(rt.Config.TempDir, filename) if err != nil { return fmt.Errorf("error creating temp file: %s: %w", filename, err) } @@ -357,7 +357,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client recordCount := 0 - dailies, err := GetDailyArchivesForDateRange(ctx, db, org, archiveType, startDate, endDate) + dailies, err := GetDailyArchivesForDateRange(ctx, rt.DB, org, archiveType, startDate, endDate) if err != nil { return err } @@ -375,7 +375,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client continue } - reader, err := GetS3File(ctx, s3Client, daily.URL) + reader, err := GetS3File(ctx, rt.S3, daily.URL) if err != nil { return fmt.Errorf("error reading S3 URL: %s: %w", daily.URL, err) } @@ -549,7 +549,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi } // UploadArchive uploads the passed archive file to S3 -func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, archive *Archive) error { +func UploadArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() @@ -568,7 +568,7 @@ func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, ar archive.Hash) } - err := UploadToS3(ctx, s3Client, bucket, archivePath, archive) + err := UploadToS3(ctx, rt.S3, rt.Config.S3Bucket, archivePath, archive) if err != nil { return fmt.Errorf("error uploading archive to S3: %w", err) } @@ -675,8 +675,8 @@ func DeleteArchiveFile(archive *Archive) error { } // CreateOrgArchives builds all the missing archives for the passed in org -func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { - archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) +func CreateOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { + archiveCount, err := GetCurrentArchiveCount(ctx, rt.DB, org, archiveType) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err) } @@ -685,37 +685,37 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s // no existing archives means this might be a backfill, figure out if there are full months we can build first if archiveCount == 0 { - archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) + archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error getting missing monthly archives: %w", err) } // we first create monthly archives - monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives) + monthliesCreated, monthliesFailed = createArchives(ctx, rt, org, archives) } // then add in daily archives taking into account the monthly that have been built - daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType) + daily, err := GetMissingDailyArchives(ctx, rt.DB, now, org, archiveType) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error getting missing daily archives: %w", err) } // we then create missing daily archives - dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily) + dailiesCreated, dailiesFailed = createArchives(ctx, rt, org, daily) defer ctx.Done() return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil } -func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, archive *Archive) error { - err := CreateArchiveFile(ctx, db, archive, config.TempDir) +func createArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error { + err := CreateArchiveFile(ctx, rt.DB, archive, rt.Config.TempDir) if err != nil { return fmt.Errorf("error writing archive file: %w", err) } defer func() { - if !config.KeepFiles { + if !rt.Config.KeepFiles { err := DeleteArchiveFile(archive) if err != nil { slog.Error("error deleting temporary archive file", "error", err) @@ -723,14 +723,14 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s } }() - if config.UploadToS3 { - err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) + if rt.Config.UploadToS3 { + err = UploadArchive(ctx, rt, archive) if err != nil { return fmt.Errorf("error writing archive to s3: %w", err) } } - err = WriteArchiveToDB(ctx, db, archive) + err = WriteArchiveToDB(ctx, rt.DB, archive) if err != nil { return fmt.Errorf("error writing record to db: %w", err) } @@ -738,7 +738,7 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s return nil } -func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, org Org, archives []*Archive) ([]*Archive, []*Archive) { +func createArchives(ctx context.Context, rt *runtime.Runtime, org Org, archives []*Archive) ([]*Archive, []*Archive) { log := slog.With("org_id", org.ID, "org_name", org.Name) created := make([]*Archive, 0, len(archives)) @@ -748,7 +748,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client * log.With("start_date", archive.StartDate, "end_date", archive.endDate(), "period", archive.Period, "archive_type", archive.ArchiveType).Debug("starting archive") start := dates.Now() - err := createArchive(ctx, db, config, s3Client, archive) + err := createArchive(ctx, rt, archive) if err != nil { log.Error("error creating archive", "error", err) failed = append(failed, archive) @@ -762,14 +762,14 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client * } // RollupOrgArchives rolls up monthly archives from our daily archives -func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { +func RollupOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType) // get our missing monthly archives - archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType) + archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType) if err != nil { return nil, nil, err } @@ -782,15 +782,15 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s log := log.With("start_date", archive.StartDate) start := dates.Now() - err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType) + err = BuildRollupArchive(ctx, rt, archive, now, org, archiveType) if err != nil { log.Error("error building monthly archive", "error", err) failed = append(failed, archive) continue } - if config.UploadToS3 { - err = UploadArchive(ctx, s3Client, config.S3Bucket, archive) + if rt.Config.UploadToS3 { + err = UploadArchive(ctx, rt, archive) if err != nil { log.Error("error writing archive to s3", "error", err) failed = append(failed, archive) @@ -798,14 +798,14 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s } } - err = WriteArchiveToDB(ctx, db, archive) + err = WriteArchiveToDB(ctx, rt.DB, archive) if err != nil { log.Error("error writing record to db", "error", err) failed = append(failed, archive) continue } - if !config.KeepFiles { + if !rt.Config.KeepFiles { err := DeleteArchiveFile(archive) if err != nil { log.Error("error deleting temporary file", "error", err) @@ -825,9 +825,9 @@ const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FA var deleteTransactionSize = 100 // DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created -func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, error) { +func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted - archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType) + archives, err := GetArchivesNeedingDeletion(ctx, rt.DB, org, archiveType) if err != nil { return nil, fmt.Errorf("error finding archives needing deletion '%s'", archiveType) } @@ -848,15 +848,15 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config switch a.ArchiveType { case MessageType: - err = DeleteArchivedMessages(ctx, config, db, s3Client, a) + err = DeleteArchivedMessages(ctx, rt, a) if err == nil { - err = DeleteBroadcasts(ctx, now, config, db, org) + err = DeleteBroadcasts(ctx, rt, now, org) } case RunType: - err = DeleteArchivedRuns(ctx, config, db, s3Client, a) + err = DeleteArchivedRuns(ctx, rt, a) if err == nil { - err = DeleteFlowStarts(ctx, now, config, db, org) + err = DeleteFlowStarts(ctx, rt, now, org) } default: @@ -876,11 +876,11 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } // ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives -func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { +func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { log := slog.With("org_id", org.ID, "org_name", org.Name) start := dates.Now() - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, rt, now, org, archiveType) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error creating archives: %w", err) } @@ -891,7 +891,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3 log.Info("completed archival for org", "elapsed", elapsed, "records_per_second", rate) } - rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType) + rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, rt, now, org, archiveType) if err != nil { return nil, nil, nil, nil, nil, fmt.Errorf("error rolling up archives: %w", err) } @@ -902,8 +902,8 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3 // finally delete any archives not yet actually archived var deleted []*Archive - if cfg.Delete { - deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType) + if rt.Config.Delete { + deleted, err = DeleteArchivedOrgRecords(ctx, rt, now, org, archiveType) if err != nil { return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err) } @@ -913,12 +913,12 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3 } // ArchiveActiveOrgs fetches active orgs and archives messages and runs -func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error { +func ArchiveActiveOrgs(rt *runtime.Runtime) error { start := dates.Now() // get our active orgs ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - orgs, err := GetActiveOrgs(ctx, db, cfg) + orgs, err := GetActiveOrgs(ctx, rt) cancel() if err != nil { @@ -937,8 +937,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error { ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12) log := slog.With("org_id", org.ID, "org_name", org.Name) - if cfg.ArchiveMessages { - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType) + if rt.Config.ArchiveMessages { + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, MessageType) if err != nil { log.Error("error archiving org messages", "error", err, "archive_type", MessageType) } @@ -948,8 +948,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error { totalMsgsRollupsCreated += len(monthliesCreated) totalMsgsRollupsFailed += len(monthliesFailed) } - if cfg.ArchiveRuns { - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType) + if rt.Config.ArchiveRuns { + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, RunType) if err != nil { log.Error("error archiving org runs", "error", err, "archive_type", RunType) } diff --git a/archives/archives_test.go b/archives/archives_test.go index b8d6cb7..919adee 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -13,11 +13,14 @@ import ( _ "github.com/lib/pq" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/rp-archiver/runtime" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func setup(t *testing.T) (*Config, *sqlx.DB) { - config := NewDefaultConfig() +func setup(t *testing.T) (context.Context, *runtime.Runtime) { + ctx := context.Background() + config := runtime.NewDefaultConfig() config.DB = "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC" // configure S3 to use a local minio instance @@ -27,44 +30,45 @@ func setup(t *testing.T) (*Config, *sqlx.DB) { config.S3Minio = true testDB, err := os.ReadFile("../testdb.sql") - assert.NoError(t, err) + require.NoError(t, err) db, err := sqlx.Open("postgres", config.DB) - assert.NoError(t, err) + require.NoError(t, err) _, err = db.Exec(string(testDB)) - assert.NoError(t, err) + require.NoError(t, err) + + s3Client, err := NewS3Client(config) + require.NoError(t, err) slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))) - return config, db + return ctx, &runtime.Runtime{Config: config, DB: db, S3: s3Client} } func TestGetMissingDayArchives(t *testing.T) { - config, db := setup(t) + ctx, rt := setup(t) - ctx := context.Background() - - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) assert.Len(t, orgs, 3) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) // org 1 is too new, no tasks - tasks, err := GetMissingDailyArchives(ctx, db, now, orgs[0], MessageType) + tasks, err := GetMissingDailyArchives(ctx, rt.DB, now, orgs[0], MessageType) assert.NoError(t, err) assert.Len(t, tasks, 0) // org 2 should have some - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[1], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[1], MessageType) assert.NoError(t, err) assert.Len(t, tasks, 61) assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), tasks[60].StartDate) // org 3 is the same as 2, but two of the tasks have already been built - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Len(t, tasks, 31) assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -73,13 +77,13 @@ func TestGetMissingDayArchives(t *testing.T) { // org 3 again, but changing the archive period so we have no tasks orgs[2].RetentionPeriod = 200 - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Len(t, tasks, 0) // org 1 again, but lowering the archive period so we have tasks orgs[0].RetentionPeriod = 2 - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[0], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[0], MessageType) assert.NoError(t, err) assert.Len(t, tasks, 58) assert.Equal(t, time.Date(2017, 11, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -89,30 +93,27 @@ func TestGetMissingDayArchives(t *testing.T) { } func TestGetMissingMonthArchives(t *testing.T) { - config, db := setup(t) - - // get the tasks for our org - ctx := context.Background() + ctx, rt := setup(t) - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) // org 1 is too new, no tasks - tasks, err := GetMissingMonthlyArchives(ctx, db, now, orgs[0], MessageType) + tasks, err := GetMissingMonthlyArchives(ctx, rt.DB, now, orgs[0], MessageType) assert.NoError(t, err) assert.Equal(t, 0, len(tasks)) // org 2 should have some - tasks, err = GetMissingMonthlyArchives(ctx, db, now, orgs[1], MessageType) + tasks, err = GetMissingMonthlyArchives(ctx, rt.DB, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 2, len(tasks)) assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), tasks[1].StartDate) // org 3 is the same as 2, but two of the tasks have already been built - tasks, err = GetMissingMonthlyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingMonthlyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Equal(t, 1, len(tasks)) assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -120,23 +121,22 @@ func TestGetMissingMonthArchives(t *testing.T) { } func TestCreateMsgArchive(t *testing.T) { - config, db := setup(t) - ctx := context.Background() + ctx, rt := setup(t) err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - tasks, err := GetMissingDailyArchives(ctx, db, now, orgs[1], MessageType) + tasks, err := GetMissingDailyArchives(ctx, rt.DB, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 61, len(tasks)) task := tasks[0] // build our first task, should have no messages - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have no records and be an empty gzip file @@ -148,7 +148,7 @@ func TestCreateMsgArchive(t *testing.T) { // build our third task, should have two messages task = tasks[2] - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have two records, second will have attachments @@ -163,12 +163,12 @@ func TestCreateMsgArchive(t *testing.T) { assert.True(t, os.IsNotExist(err)) // test the anonymous case - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Equal(t, 31, len(tasks)) task = tasks[0] - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have one record @@ -196,22 +196,21 @@ func assertArchiveFile(t *testing.T, archive *Archive, truthName string) { } func TestCreateRunArchive(t *testing.T) { - config, db := setup(t) - ctx := context.Background() + ctx, rt := setup(t) err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - tasks, err := GetMissingDailyArchives(ctx, db, now, orgs[1], RunType) + tasks, err := GetMissingDailyArchives(ctx, rt.DB, now, orgs[1], RunType) assert.NoError(t, err) assert.Equal(t, 62, len(tasks)) task := tasks[0] - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have no records and be an empty gzip file @@ -222,7 +221,7 @@ func TestCreateRunArchive(t *testing.T) { DeleteArchiveFile(task) task = tasks[2] - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have two record @@ -236,13 +235,13 @@ func TestCreateRunArchive(t *testing.T) { assert.True(t, os.IsNotExist(err)) // ok, let's do an anon org - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], RunType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], RunType) assert.NoError(t, err) assert.Equal(t, 62, len(tasks)) task = tasks[0] // build our first task, should have no messages - err = CreateArchiveFile(ctx, db, task, "/tmp") + err = CreateArchiveFile(ctx, rt.DB, task, "/tmp") assert.NoError(t, err) // should have one record @@ -255,17 +254,16 @@ func TestCreateRunArchive(t *testing.T) { } func TestWriteArchiveToDB(t *testing.T) { - config, db := setup(t) - ctx := context.Background() + ctx, rt := setup(t) - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - existing, err := GetCurrentArchives(ctx, db, orgs[2], MessageType) + existing, err := GetCurrentArchives(ctx, rt.DB, orgs[2], MessageType) assert.NoError(t, err) - tasks, err := GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err := GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Equal(t, 31, len(tasks)) assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -273,19 +271,19 @@ func TestWriteArchiveToDB(t *testing.T) { task := tasks[0] task.Dailies = []*Archive{existing[0], existing[1]} - err = WriteArchiveToDB(ctx, db, task) + err = WriteArchiveToDB(ctx, rt.DB, task) assert.NoError(t, err) assert.Equal(t, 5, task.ID) assert.Equal(t, false, task.NeedsDeletion) // if we recalculate our tasks, we should have one less now - existing, err = GetCurrentArchives(ctx, db, orgs[2], MessageType) + existing, err = GetCurrentArchives(ctx, rt.DB, orgs[2], MessageType) assert.Equal(t, task.ID, *existing[0].Rollup) assert.Equal(t, task.ID, *existing[2].Rollup) assert.NoError(t, err) - tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType) + tasks, err = GetMissingDailyArchives(ctx, rt.DB, now, orgs[2], MessageType) assert.NoError(t, err) assert.Equal(t, 30, len(tasks)) assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), tasks[0].StartDate) @@ -307,22 +305,19 @@ func getCountInRange(db *sqlx.DB, query string, orgID int, start time.Time, end } func TestArchiveOrgMessages(t *testing.T) { - config, db := setup(t) - ctx := context.Background() + ctx, rt := setup(t) + deleteTransactionSize = 1 - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - config.Delete = true - - s3Client, err := NewS3Client(config) - assert.NoError(t, err) + rt.Config.Delete = true - assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) + assertCount(t, rt.DB, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, rt, now, orgs[1], MessageType) assert.NoError(t, err) assert.Equal(t, 61, len(dailiesCreated)) @@ -347,7 +342,7 @@ func TestArchiveOrgMessages(t *testing.T) { // shouldn't have any messages remaining for this org for those periods for _, d := range deleted { count, err := getCountInRange( - db, + rt.DB, getMsgCount, orgs[1].ID, d.StartDate, @@ -361,7 +356,7 @@ func TestArchiveOrgMessages(t *testing.T) { // our one message in our existing archive (but that had an invalid URL) should still exist however count, err := getCountInRange( - db, + rt.DB, getMsgCount, orgs[1].ID, time.Date(2017, 10, 8, 0, 0, 0, 0, time.UTC), @@ -372,7 +367,7 @@ func TestArchiveOrgMessages(t *testing.T) { // and messages on our other orgs should be unaffected count, err = getCountInRange( - db, + rt.DB, getMsgCount, orgs[2].ID, time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), @@ -383,7 +378,7 @@ func TestArchiveOrgMessages(t *testing.T) { // as is our newer message which was replied to count, err = getCountInRange( - db, + rt.DB, getMsgCount, orgs[1].ID, time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC), @@ -393,7 +388,7 @@ func TestArchiveOrgMessages(t *testing.T) { assert.Equal(t, 1, count) // one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new - assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) + assertCount(t, rt.DB, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) } const getRunCount = ` @@ -418,19 +413,15 @@ func assertArchive(t *testing.T, a *Archive, startDate time.Time, period Archive } func TestArchiveOrgRuns(t *testing.T) { - config, db := setup(t) - ctx := context.Background() + ctx, rt := setup(t) - orgs, err := GetActiveOrgs(ctx, db, config) + orgs, err := GetActiveOrgs(ctx, rt) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - config.Delete = true + rt.Config.Delete = true - s3Client, err := NewS3Client(config) - assert.NoError(t, err) - - dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) + dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, rt, now, orgs[2], RunType) assert.NoError(t, err) assert.Equal(t, 10, len(dailiesCreated)) @@ -446,7 +437,7 @@ func TestArchiveOrgRuns(t *testing.T) { // no runs remaining for _, d := range deleted { count, err := getCountInRange( - db, + rt.DB, getRunCount, orgs[2].ID, d.StartDate, @@ -461,7 +452,7 @@ func TestArchiveOrgRuns(t *testing.T) { // other org runs unaffected count, err := getCountInRange( - db, + rt.DB, getRunCount, orgs[1].ID, time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), @@ -472,7 +463,7 @@ func TestArchiveOrgRuns(t *testing.T) { // more recent run unaffected (even though it was parent) count, err = getCountInRange( - db, + rt.DB, getRunCount, orgs[2].ID, time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC), @@ -483,7 +474,7 @@ func TestArchiveOrgRuns(t *testing.T) { // org 2 has a run that can't be archived because it's still active - as it has no existing archives // this will manifest itself as a monthly which fails to save - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, now, orgs[1], RunType) assert.NoError(t, err) assert.Equal(t, 31, len(dailiesCreated)) @@ -500,7 +491,7 @@ func TestArchiveOrgRuns(t *testing.T) { } func TestArchiveActiveOrgs(t *testing.T) { - config, db := setup(t) + _, rt := setup(t) mockAnalytics := analytics.NewMock() analytics.RegisterBackend(mockAnalytics) @@ -509,10 +500,7 @@ func TestArchiveActiveOrgs(t *testing.T) { dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC))) defer dates.SetNowSource(dates.DefaultNowSource) - s3Client, err := NewS3Client(config) - assert.NoError(t, err) - - err = ArchiveActiveOrgs(db, config, s3Client) + err := ArchiveActiveOrgs(rt) assert.NoError(t, err) assert.Equal(t, map[string][]float64{ diff --git a/archives/messages.go b/archives/messages.go index b8ad61a..5a67f1d 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -9,7 +9,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/dates" - "github.com/nyaruka/gocommon/s3x" + "github.com/nyaruka/rp-archiver/runtime" ) const ( @@ -113,7 +113,7 @@ DELETE FROM msgs_msg WHERE id IN(?)` // all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time // // Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error { +func DeleteArchivedMessages(ctx context.Context, rt *runtime.Runtime, archive *Archive) error { outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() @@ -129,7 +129,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 log.Info("deleting messages") // first things first, make sure our file is correct on S3 - s3Size, s3Hash, err := GetS3FileInfo(outer, s3Client, archive.URL) + s3Size, s3Hash, err := GetS3FileInfo(outer, rt.S3, archive.URL) if err != nil { return err } @@ -139,12 +139,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 } // if S3 hash is MD5 then check against archive hash - if config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash { + if rt.Config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash { return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, s3Hash) } // ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, sqlSelectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) + rows, err := rt.DB.QueryxContext(outer, sqlSelectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate()) if err != nil { return err } @@ -184,7 +184,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 start := dates.Now() // start our transaction - tx, err := db.BeginTxx(ctx, nil) + tx, err := rt.DB.BeginTxx(ctx, nil) if err != nil { return err } @@ -218,7 +218,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3 deletedOn := dates.Now() // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn) + _, err = rt.DB.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn) if err != nil { return fmt.Errorf("error setting archive as deleted: %w", err) } @@ -237,11 +237,11 @@ SELECT id LIMIT 1000000;` // DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no associated messages -func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { +func DeleteBroadcasts(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org) error { start := dates.Now() threshhold := now.AddDate(0, 0, -org.RetentionPeriod) - rows, err := db.QueryxContext(ctx, sqlSelectOldOrgBroadcasts, org.ID, threshhold) + rows, err := rt.DB.QueryxContext(ctx, sqlSelectOldOrgBroadcasts, org.ID, threshhold) if err != nil { return err } @@ -265,7 +265,7 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq } // we delete broadcasts in a transaction per broadcast - tx, err := db.BeginTx(ctx, nil) + tx, err := rt.DB.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("error starting transaction while deleting broadcast: %d: %w", broadcastID, err) } diff --git a/archives/runs.go b/archives/runs.go index 77f1544..2764a02 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -9,7 +9,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/dates" - "github.com/nyaruka/gocommon/s3x" + "github.com/nyaruka/rp-archiver/runtime" ) const ( @@ -105,7 +105,7 @@ DELETE FROM flows_flowrun WHERE id IN(?)` // all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time // // Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error { +func DeleteArchivedRuns(ctx context.Context, rt *runtime.Runtime, archive *Archive) error { outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() @@ -121,7 +121,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie log.Info("deleting runs") // first things first, make sure our file is correct on S3 - s3Size, s3Hash, err := GetS3FileInfo(outer, s3Client, archive.URL) + s3Size, s3Hash, err := GetS3FileInfo(outer, rt.S3, archive.URL) if err != nil { return err } @@ -131,12 +131,12 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie } // if S3 hash is MD5 then check against archive hash - if config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash { + if rt.Config.CheckS3Hashes && archive.Size <= maxSingleUploadBytes && s3Hash != archive.Hash { return fmt.Errorf("archive md5: %s and s3 etag: %s do not match", archive.Hash, s3Hash) } // ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big - rows, err := db.QueryxContext(outer, sqlSelectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) + rows, err := rt.DB.QueryxContext(outer, sqlSelectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate()) if err != nil { return err } @@ -179,7 +179,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie start := dates.Now() // start our transaction - tx, err := db.BeginTxx(ctx, nil) + tx, err := rt.DB.BeginTxx(ctx, nil) if err != nil { return err } @@ -207,7 +207,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie deletedOn := dates.Now() // all went well! mark our archive as no longer needing deletion - _, err = db.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn) + _, err = rt.DB.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn) if err != nil { return fmt.Errorf("error setting archive as deleted: %w", err) } @@ -226,11 +226,11 @@ const selectOldOrgFlowStarts = ` LIMIT 1000000;` // DeleteFlowStarts deletes all starts older than 90 days for the passed in org which have no associated runs -func DeleteFlowStarts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error { +func DeleteFlowStarts(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org) error { start := dates.Now() threshhold := now.AddDate(0, 0, -org.RetentionPeriod) - rows, err := db.QueryxContext(ctx, selectOldOrgFlowStarts, org.ID, threshhold) + rows, err := rt.DB.QueryxContext(ctx, selectOldOrgFlowStarts, org.ID, threshhold) if err != nil { return err } @@ -253,7 +253,7 @@ func DeleteFlowStarts(ctx context.Context, now time.Time, config *Config, db *sq } // we delete starts in a transaction per start - tx, err := db.BeginTx(ctx, nil) + tx, err := rt.DB.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("error starting transaction while deleting start: %d: %w", startID, err) } diff --git a/archives/s3.go b/archives/s3.go index 6f4d1b5..74d60f4 100644 --- a/archives/s3.go +++ b/archives/s3.go @@ -16,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/nyaruka/gocommon/s3x" + "github.com/nyaruka/rp-archiver/runtime" ) const s3BucketURL = "https://%s.s3.amazonaws.com%s" @@ -27,7 +28,7 @@ const maxSingleUploadBytes = 5e9 // 5GB const chunkSizeBytes = 1e9 // 1GB // NewS3Client creates a new s3 service from the passed in config, testing it as necessary -func NewS3Client(cfg *Config) (*s3x.Service, error) { +func NewS3Client(cfg *runtime.Config) (*s3x.Service, error) { svc, err := s3x.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.S3Endpoint, cfg.S3Minio) if err != nil { return nil, err diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index 7221b15..3f921cc 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -14,8 +14,8 @@ import ( "github.com/nyaruka/ezconf" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" - "github.com/nyaruka/gocommon/s3x" "github.com/nyaruka/rp-archiver/archives" + "github.com/nyaruka/rp-archiver/runtime" slogmulti "github.com/samber/slog-multi" slogsentry "github.com/samber/slog-sentry" ) @@ -27,7 +27,7 @@ var ( ) func main() { - config := archives.NewDefaultConfig() + config := runtime.NewDefaultConfig() loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", []string{"archiver.toml"}) loader.MustLoad() @@ -84,17 +84,20 @@ func main() { config.DB += "?TimeZone=UTC" } - db, err := sqlx.Open("postgres", config.DB) + rt := &runtime.Runtime{ + Config: config, + } + + rt.DB, err = sqlx.Open("postgres", config.DB) if err != nil { logger.Error("error connecting to db", "error", err) } else { - db.SetMaxOpenConns(2) + rt.DB.SetMaxOpenConns(2) logger.Info("db ok", "state", "starting") } - var s3Client *s3x.Service if config.UploadToS3 { - s3Client, err = archives.NewS3Client(config) + rt.S3, err = archives.NewS3Client(config) if err != nil { logger.Error("unable to initialize s3 client", "error", err) } else { @@ -126,7 +129,7 @@ func main() { analytics.Start() if config.Once { - doArchival(db, config, s3Client) + doArchival(rt) } else { for { nextArchival := getNextArchivalTime(timeOfDay) @@ -135,7 +138,7 @@ func main() { logger.Info("sleeping until next archival", "sleep_time", napTime, "next_archival", nextArchival) time.Sleep(napTime) - doArchival(db, config, s3Client) + doArchival(rt) } } @@ -143,10 +146,10 @@ func main() { wg.Wait() } -func doArchival(db *sqlx.DB, cfg *archives.Config, s3Client *s3x.Service) { +func doArchival(rt *runtime.Runtime) { for { // try to archive all active orgs, and if it fails, wait 5 minutes and try again - err := archives.ArchiveActiveOrgs(db, cfg, s3Client) + err := archives.ArchiveActiveOrgs(rt) if err != nil { slog.Error("error archiving, will retry in 5 minutes", "error", err) time.Sleep(time.Minute * 5) diff --git a/archives/config.go b/runtime/config.go similarity index 99% rename from archives/config.go rename to runtime/config.go index de04870..fea8c32 100644 --- a/archives/config.go +++ b/runtime/config.go @@ -1,4 +1,4 @@ -package archives +package runtime import "os" diff --git a/runtime/runtime.go b/runtime/runtime.go new file mode 100644 index 0000000..9fa9aaa --- /dev/null +++ b/runtime/runtime.go @@ -0,0 +1,12 @@ +package runtime + +import ( + "github.com/jmoiron/sqlx" + "github.com/nyaruka/gocommon/s3x" +) + +type Runtime struct { + Config *Config + DB *sqlx.DB + S3 *s3x.Service +}