Skip to content

Commit

Permalink
fix: correctly record event times (#3254)
Browse files Browse the repository at this point in the history
  • Loading branch information
alecthomas authored Oct 30, 2024
1 parent 8d38463 commit 46fdd15
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 42 deletions.
1 change: 1 addition & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ lint-frontend: build-frontend
lint-backend:
@golangci-lint run --new-from-rev=$(git merge-base origin/main HEAD) ./...
@lint-commit-or-rollback ./backend/...
@go-check-sumtype ./...

lint-scripts:
#!/bin/bash
Expand Down
8 changes: 3 additions & 5 deletions backend/controller/timeline/events_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ type Call struct {
Response either.Either[*ftlv1.CallResponse, error]
}

func (c *Call) inEvent() {}

func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call *Call) error {
callEvent := callToCallEvent(call)
func (c *Call) toEvent() (Event, error) { return callToCallEvent(c), nil } //nolint:unparam

func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, callEvent *CallEvent) error {
var sourceModule, sourceVerb optional.Option[string]
if sr, ok := callEvent.SourceVerb.Get(); ok {
sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name)
Expand Down Expand Up @@ -95,7 +93,7 @@ func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call
}

err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
DeploymentKey: call.DeploymentKey,
DeploymentKey: callEvent.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
TimeStamp: callEvent.Time,
Expand Down
11 changes: 8 additions & 3 deletions backend/controller/timeline/events_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ type CronScheduled struct {
Error optional.Option[string]
}

func (*CronScheduled) inEvent() {}
func (e *CronScheduled) toEvent() (Event, error) { //nolint:unparam
return &CronScheduledEvent{
CronScheduled: *e,
Duration: time.Since(e.Time),
}, nil
}

type eventCronScheduledJSON struct {
DurationMS int64 `json:"duration_ms"`
Expand All @@ -43,9 +48,9 @@ type eventCronScheduledJSON struct {
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduled) error {
func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduledEvent) error {
cronJSON := eventCronScheduledJSON{
DurationMS: time.Since(event.Time).Milliseconds(),
DurationMS: event.Duration.Milliseconds(),
ScheduledAt: event.ScheduledAt,
Schedule: event.Schedule,
Error: event.Error,
Expand Down
47 changes: 29 additions & 18 deletions backend/controller/timeline/events_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ type Ingress struct {
Error optional.Option[string]
}

func (*Ingress) inEvent() {}

func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *Ingress) error {
func (ingress *Ingress) toEvent() (Event, error) {
requestBody := ingress.RequestBody
if len(requestBody) == 0 {
requestBody = []byte("{}")
Expand All @@ -78,29 +76,42 @@ func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, in
responseBody = []byte("{}")
}

if len(responseBody) == 0 {
responseBody = []byte("{}")
}

reqHeaderBytes, err := json.Marshal(ingress.RequestHeaders)
if err != nil {
return fmt.Errorf("failed to marshal request header: %w", err)
return nil, fmt.Errorf("failed to marshal request header: %w", err)
}

respHeaderBytes, err := json.Marshal(ingress.ResponseHeaders)
if err != nil {
return fmt.Errorf("failed to marshal response header: %w", err)
return nil, fmt.Errorf("failed to marshal response header: %w", err)
}

ingressJSON := eventIngressJSON{
DurationMS: time.Since(ingress.StartTime).Milliseconds(),
return &IngressEvent{
DeploymentKey: ingress.DeploymentKey,
RequestKey: optional.Some(ingress.RequestKey),
Verb: *ingress.Verb,
Method: ingress.RequestMethod,
Path: ingress.RequestPath,
StatusCode: ingress.ResponseStatus,
Request: json.RawMessage(requestBody),
RequestHeader: json.RawMessage(reqHeaderBytes),
Response: json.RawMessage(responseBody),
ResponseHeader: json.RawMessage(respHeaderBytes),
Time: ingress.StartTime,
Duration: time.Since(ingress.StartTime),
Request: requestBody,
RequestHeader: reqHeaderBytes,
Response: responseBody,
ResponseHeader: respHeaderBytes,
Error: ingress.Error,
}, nil
}

func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *IngressEvent) error {
ingressJSON := eventIngressJSON{
DurationMS: ingress.Duration.Milliseconds(),
Method: ingress.Method,
Path: ingress.Path,
StatusCode: ingress.StatusCode,
Request: ingress.Request,
RequestHeader: ingress.RequestHeader,
Response: ingress.Response,
ResponseHeader: ingress.ResponseHeader,
Error: ingress.Error,
}

Expand All @@ -115,12 +126,12 @@ func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, in
return fmt.Errorf("failed to encrypt ingress payload: %w", err)
}

log.FromContext(ctx).Warnf("Inserting ingress event for %s %s", ingress.RequestKey, ingress.RequestPath)
log.FromContext(ctx).Warnf("Inserting ingress event for %s %s", ingress.RequestKey, ingress.Path)

err = libdal.TranslatePGError(querier.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{
DeploymentKey: ingress.DeploymentKey,
RequestKey: optional.Some(ingress.RequestKey.String()),
TimeStamp: ingress.StartTime,
TimeStamp: ingress.Time,
Module: ingress.Verb.Module,
Verb: ingress.Verb.Name,
IngressType: "http",
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/timeline/events_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Log struct {
Error optional.Option[string]
}

func (l *Log) inEvent() {}
func (l *Log) toEvent() (Event, error) { return &LogEvent{Log: *l}, nil } //nolint:unparam

type LogEvent struct {
ID int64
Expand All @@ -40,7 +40,7 @@ type eventLogJSON struct {
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *Log) error {
func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *LogEvent) error {
var requestKey optional.Option[string]
if name, ok := log.RequestKey.Get(); ok {
requestKey = optional.Some(name.String())
Expand Down
27 changes: 17 additions & 10 deletions backend/controller/timeline/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,21 @@ type Event interface {

// InEvent is a marker interface for events that are inserted into the timeline.
type InEvent interface {
inEvent()
toEvent() (Event, error)
}

type Service struct {
ctx context.Context
conn *stdsql.DB
encryption *encryption.Service
events chan InEvent
events chan Event
lastDroppedError atomic.Value[time.Time]
lastFailedError atomic.Value[time.Time]
}

func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service {
var s *Service
events := make(chan InEvent, 1000)
events := make(chan Event, 1000)
s = &Service{
ctx: ctx,
conn: conn,
Expand All @@ -72,7 +72,12 @@ func (s *Service) DeleteOldEvents(ctx context.Context, eventType EventType, age
}

// EnqueueEvent asynchronously enqueues an event for insertion into the timeline.
func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) {
func (s *Service) EnqueueEvent(ctx context.Context, inEvent InEvent) {
event, err := inEvent.toEvent()
if err != nil {
log.FromContext(ctx).Warnf("Failed to convert event to event: %v", err)
return
}
select {
case s.events <- event:
default:
Expand All @@ -85,7 +90,7 @@ func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) {

func (s *Service) processEvents() {
lastFlush := time.Now()
buffer := make([]InEvent, 0, maxBatchSize)
buffer := make([]Event, 0, maxBatchSize)
for {
select {
case event := <-s.events:
Expand All @@ -108,7 +113,7 @@ func (s *Service) processEvents() {
}

// Flush all events in the buffer to the database in a single transaction.
func (s *Service) flushEvents(events []InEvent) {
func (s *Service) flushEvents(events []Event) {
logger := log.FromContext(s.ctx).Scope("timeline")
tx, err := s.conn.Begin()
if err != nil {
Expand All @@ -121,14 +126,16 @@ func (s *Service) flushEvents(events []InEvent) {
for _, event := range events {
var err error
switch e := event.(type) {
case *Call:
case *CallEvent:
err = s.insertCallEvent(s.ctx, querier, e)
case *Log:
case *LogEvent:
err = s.insertLogEvent(s.ctx, querier, e)
case *Ingress:
case *IngressEvent:
err = s.insertHTTPIngress(s.ctx, querier, e)
case *CronScheduled:
case *CronScheduledEvent:
err = s.insertCronScheduledEvent(s.ctx, querier, e)
case *DeploymentCreatedEvent, *DeploymentUpdatedEvent:
// TODO: Implement
default:
panic(fmt.Sprintf("unexpected event type: %T", e))
}
Expand Down
45 changes: 45 additions & 0 deletions examples/go/time/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,57 @@ replace github.com/TBD54566975/ftl => ../../..
require github.com/TBD54566975/ftl v0.0.0-00010101000000-000000000000

require (
al.essio.dev/pkg/shellescape v1.5.1 // indirect
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/XSAM/otelsql v0.35.0 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v1.2.1 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/danieljoos/wincred v1.2.2 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.7.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.6 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
Loading

0 comments on commit 46fdd15

Please sign in to comment.