Skip to content

Commit

Permalink
rework event code to improve API errors
Browse files Browse the repository at this point in the history
One of the problems with the Events() API was that you had to call it in
a new goroutine. This meant the the error returned by it had to be read
back via a second channel. This cuased other bugs in the past but here
the biggest problem is that basic errors such as invalid since/until
options were not directly returned to the caller.
It meant in the API we were not able to write http code 200 quickly
because we always waited for the first event or error from the
channels. This in turn made some clients not happy as they assume the
server hangs on time out if no such events are generated.

To fix this we resturcture the entire event flow. First we spawn the
goroutine inside the eventer Read() function so not all the callers have
to. Then we can return the basic error quickly without the goroutine.
The caller then checks the error like any normal function and the API
can use this one to decide which status code to return.
Second we now return errors/event in one channel then the callers can
decide to ignore or log them which makes it a bit more clear.

Fixes c46884a ("podman events: check for an error after we finish reading events")
Fixes containers#23712

Signed-off-by: Paul Holzinger <[email protected]>
  • Loading branch information
Luap99 committed Oct 29, 2024
1 parent e34fb9c commit 658c039
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 189 deletions.
57 changes: 24 additions & 33 deletions cmd/podman/system/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/containers/podman/v5/cmd/podman/validate"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/domain/entities"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 {
eventOptions.FromStart = true
}
eventChannel := make(chan *events.Event, 1)
eventChannel := make(chan events.ReadResult, 1)
eventOptions.EventChan = eventChannel
errChannel := make(chan error)

var (
rpt *report.Formatter
Expand All @@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
}
}

go func() {
errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions)
close(errChannel)
}()

for {
select {
case event, ok := <-eventChannel:
if !ok {
// channel was closed we can exit
// read the error channel blocking to make sure we are not missing any errors (#23165)
return <-errChannel
}
switch {
case doJSON:
e := newEventFromLibpodEvent(event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil {
return err
}
default:
fmt.Println(event.ToHumanReadable(!noTrunc))
}
case err := <-errChannel:
// only exit in case of an error,
// otherwise keep reading events until the event channel is closed
err := registry.ContainerEngine().Events(context.Background(), eventOptions)
if err != nil {
return err
}

for evt := range eventChannel {
if evt.Error != nil {
logrus.Errorf("Failed to read event: %v", evt.Error)
continue
}
switch {
case doJSON:
e := newEventFromLibpodEvent(evt.Event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil {
return err
}
default:
fmt.Println(evt.Event.ToHumanReadable(!noTrunc))
}
}
return nil
}
49 changes: 12 additions & 37 deletions libpod/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"path/filepath"
"sync"

"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events"
Expand Down Expand Up @@ -187,53 +186,29 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error

// GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
eventChannel := make(chan *events.Event)
eventChannel := make(chan events.ReadResult)
options := events.ReadOptions{
EventChannel: eventChannel,
Filters: filters,
FromStart: true,
Stream: false,
}

logEvents := make([]*events.Event, 0, len(eventChannel))
readLock := sync.Mutex{}
readLock.Lock()
go func() {
for e := range eventChannel {
logEvents = append(logEvents, e)
}
readLock.Unlock()
}()

readErr := r.eventer.Read(ctx, options)
readLock.Lock() // Wait for the events to be consumed.
return logEvents, readErr
}

// GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// FIXME: events should be read in reverse order!
// https://github.com/containers/podman/issues/14579

// check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err
}
filters := []string{
fmt.Sprintf("container=%s", nameOrID),
fmt.Sprintf("event=%s", containerEvent),
"type=container",
}
containerEvents, err := r.GetEvents(ctx, filters)
err := r.eventer.Read(ctx, options)
if err != nil {
return nil, err
}
if len(containerEvents) < 1 {
return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound)

logEvents := make([]*events.Event, 0, len(eventChannel))
for evt := range eventChannel {
// we ignore any error here, this is only used on the backup
// GetExecDiedEvent() died path as best effort anyway
if evt.Error == nil {
logEvents = append(logEvents, evt.Event)
}
}
// return the last element in the slice
return containerEvents[len(containerEvents)-1], nil

return logEvents, nil
}

// GetExecDiedEvent takes a container name or ID, exec session ID, and returns
Expand Down
7 changes: 6 additions & 1 deletion libpod/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,15 @@ type Eventer interface {
String() string
}

type ReadResult struct {
Event *Event
Error error
}

// ReadOptions describe the attributes needed to read event logs
type ReadOptions struct {
// EventChannel is the comm path back to user
EventChannel chan *Event
EventChannel chan ReadResult
// Filters are key/value pairs that describe to limit output
Filters []string
// FromStart means you start reading from the start of the logs
Expand Down
66 changes: 40 additions & 26 deletions libpod/events/journal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"time"

Expand Down Expand Up @@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error {
}

// Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
Expand All @@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
return err
}
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
if retErr != nil {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}
}()
err = j.SetDataThreshold(0)
if err != nil {
logrus.Warnf("cannot set data threshold: %v", err)
return fmt.Errorf("cannot set data threshold for journal: %v", err)
}
// match only podman journal entries
podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
Expand Down Expand Up @@ -158,30 +162,40 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
}
}

for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
return err
}
// no entry == we hit the end
if entry == nil {
return nil
}
go func() {
defer close(options.EventChannel)
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}()
for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
options.EventChannel <- ReadResult{Error: err}
break
}
// no entry == we hit the end
if entry == nil {
break
}

newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
logrus.Errorf("Unable to decode event: %v", err)
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- ReadResult{Event: newEvent}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- newEvent
}
}
}()
return nil
}

func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
Expand Down
98 changes: 53 additions & 45 deletions libpod/events/logfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func (e EventLogFile) readRotateEvent(event *Event) (begin bool, end bool, err e

// Reads from the log file
func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
Expand Down Expand Up @@ -148,56 +147,65 @@ func (e EventLogFile) Read(ctx context.Context, options ReadOptions) error {
return err
}

var line *tail.Line
var ok bool
var skipRotate bool
for {
select {
case <-ctx.Done():
// the consumer has cancelled
t.Kill(errors.New("hangup by client"))
return nil
case line, ok = <-t.Lines:
if !ok {
// channel was closed
return nil
go func() {
defer close(options.EventChannel)
var line *tail.Line
var ok bool
var skipRotate bool
for {
select {
case <-ctx.Done():
// the consumer has cancelled
t.Kill(errors.New("hangup by client"))
return
case line, ok = <-t.Lines:
if !ok {
// channel was closed
return
}
// fallthrough
}
// fallthrough
}

event, err := newEventFromJSONString(line.Text)
if err != nil {
return err
}
switch event.Type {
case Image, Volume, Pod, Container, Network:
// no-op
case System:
begin, end, err := e.readRotateEvent(event)
event, err := newEventFromJSONString(line.Text)
if err != nil {
return err
err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
}
if begin && event.Time.After(readTime) {
// If the rotation event happened _after_ we
// started reading, we need to ignore/skip
// subsequent event until the end of the
// rotation.
skipRotate = true
logrus.Debugf("Skipping already read events after log-file rotation: %v", event)
} else if end {
// This rotate event
skipRotate = false
switch event.Type {
case Image, Volume, Pod, Container, Network:
// no-op
case System:
begin, end, err := e.readRotateEvent(event)
if err != nil {
options.EventChannel <- ReadResult{Error: err}
continue
}
if begin && event.Time.After(readTime) {
// If the rotation event happened _after_ we
// started reading, we need to ignore/skip
// subsequent event until the end of the
// rotation.
skipRotate = true
logrus.Debugf("Skipping already read events after log-file rotation: %v", event)
} else if end {
// This rotate event
skipRotate = false
}
default:
err := fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
options.EventChannel <- ReadResult{Error: err}
continue
}
if skipRotate {
continue
}
if applyFilters(event, filterMap) {
options.EventChannel <- ReadResult{Event: event}
}
default:
return fmt.Errorf("event type %s is not valid in %s", event.Type.String(), e.options.LogFilePath)
}
if skipRotate {
continue
}
if applyFilters(event, filterMap) {
options.EventChannel <- event
}
}
}()
return nil
}

// String returns a string representation of the logger
Expand Down
Loading

0 comments on commit 658c039

Please sign in to comment.