Skip to content

Commit

Permalink
feat: reaper job for clearing out old RPC call history (#2273)
Browse files Browse the repository at this point in the history
Fixes #2273

Defaults to 24h
  • Loading branch information
gak authored Aug 7, 2024
1 parent 326bd22 commit f1abb89
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 0 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ issues:
- "fmt.Errorf can be replaced with errors.New"
- "fmt.Sprintf can be replaced with string concatenation"
- "strings.Title has been deprecated"
- "error returned from external package is unwrapped.*TranslatePGError"
22 changes: 22 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ type Config struct {
ControllerTimeout time.Duration `help:"Controller heartbeat timeout." default:"10s"`
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
ModuleUpdateFrequency time.Duration `help:"Frequency to send module updates." default:"30s"`
EventLogRetention *time.Duration `help:"Delete call logs after this time period. 0 to disable" env:"FTL_EVENT_LOG_RETENTION" default:"24h"`
ArtefactChunkSize int `help:"Size of each chunk streamed to the client." default:"1048576"`
EncryptionKeys
CommonConfig
Expand Down Expand Up @@ -326,6 +327,7 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s
// Singleton tasks use leases to only run on a single controller.
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleControllers, time.Second*2, time.Second*20, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reapStaleRunners, time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.reapCallEvents, time.Second*2, time.Second, time.Second*10))
svc.tasks.Singleton(maybeDevelTask(svc.releaseExpiredReservations, time.Second*2, time.Second, time.Second*20))
svc.tasks.Singleton(maybeDevelTask(svc.reconcileDeployments, time.Second*2, time.Second, time.Second*5))
svc.tasks.Singleton(maybeDevelTask(svc.reconcileRunners, time.Second*2, time.Second, time.Second*5))
Expand Down Expand Up @@ -1794,6 +1796,26 @@ func (s *Service) syncSchema(ctx context.Context) {
}
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)

if s.config.EventLogRetention == nil {
logger.Tracef("Event log retention is disabled, will not prune.")
return time.Hour, nil
}

removed, err := s.dal.DeleteOldEvents(ctx, dal.EventTypeCall, *s.config.EventLogRetention)
if err != nil {
return 0, fmt.Errorf("failed to prune call events: %w", err)
}
if removed > 0 {
logger.Debugf("Pruned %d call events older than %s", removed, s.config.EventLogRetention)
}

// Prune every 5% of the retention period.
return *s.config.EventLogRetention / 20, nil
}

func extractIngressRoutingEntries(req *ftlv1.CreateDeploymentRequest) []dal.IngressRoutingEntry {
var ingressRoutes []dal.IngressRoutingEntry
for _, decl := range req.Schema.Decls {
Expand Down
5 changes: 5 additions & 0 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,11 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
}))
}

func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) {
count, err := d.db.DeleteOldEvents(ctx, age, eventType)
return count, dalerrs.TranslatePGError(err)
}

func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) {
rows, err := d.db.GetActiveRunners(ctx)
if err != nil {
Expand Down
95 changes: 95 additions & 0 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,98 @@ func assertEventsEqual(t *testing.T, expected, actual []Event) {
t.Helper()
assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual))
}

func TestDeleteOldEvents(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn, NoOpEncryptors())
assert.NoError(t, err)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
var testSha sha256.SHA256

t.Run("CreateArtefact", func(t *testing.T) {
testSha, err = dal.CreateArtefact(ctx, testContent)
assert.NoError(t, err)
})

module := &schema.Module{Name: "test"}
var deploymentKey model.DeploymentKey
t.Run("CreateDeployment", func(t *testing.T) {
deploymentKey, err = dal.CreateDeployment(ctx, "go", module, []DeploymentArtefact{{
Digest: testSha,
Executable: true,
Path: "dir/filename",
}}, nil, nil)
assert.NoError(t, err)
})

requestKey := model.NewRequestKey(model.OriginIngress, "GET /test")
// week old event
callEvent := &CallEvent{
Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond),
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Request: []byte("{}"),
Response: []byte(`{"time": "now"}`),
DestVerb: schema.Ref{Module: "time", Name: "time"},
}
t.Run("InsertCallEvent", func(t *testing.T) {
err = dal.InsertCallEvent(ctx, callEvent)
assert.NoError(t, err)
})
// hour old event
callEvent = &CallEvent{
Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond),
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Request: []byte("{}"),
Response: []byte(`{"time": "now"}`),
DestVerb: schema.Ref{Module: "time", Name: "time"},
}
t.Run("InsertCallEvent", func(t *testing.T) {
err = dal.InsertCallEvent(ctx, callEvent)
assert.NoError(t, err)
})

// week old event
logEvent := &LogEvent{
Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond),
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Level: int32(log.Warn),
Attributes: map[string]string{"attr": "value"},
Message: "A log entry",
}
t.Run("InsertLogEntry", func(t *testing.T) {
err = dal.InsertLogEvent(ctx, logEvent)
assert.NoError(t, err)
})
// hour old event
logEvent = &LogEvent{
Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond),
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey),
Level: int32(log.Warn),
Attributes: map[string]string{"attr": "value"},
Message: "A log entry",
}
t.Run("InsertLogEntry", func(t *testing.T) {
err = dal.InsertLogEvent(ctx, logEvent)
assert.NoError(t, err)
})

t.Run("DeleteOldEvents", func(t *testing.T) {
count, err := dal.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour)
assert.NoError(t, err)
assert.Equal(t, int64(1), count)

count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute)
assert.NoError(t, err)
assert.Equal(t, int64(2), count)

count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute)
assert.NoError(t, err)
assert.Equal(t, int64(0), count)
})
}
1 change: 1 addition & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ VALUES (
sqlc.arg('payload')
);

-- name: DeleteOldEvents :one
WITH deleted AS (
DELETE FROM events
WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL
AND type = sqlc.arg('type')
RETURNING 1
)
SELECT COUNT(*)
FROM deleted;

-- name: CreateRequest :exec
INSERT INTO requests (origin, "key", source_addr)
VALUES ($1, $2, $3);
Expand Down
18 changes: 18 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f1abb89

Please sign in to comment.