diff --git a/ghx/dagger.go b/ghx/dagger.go deleted file mode 100644 index 849e13aa..00000000 --- a/ghx/dagger.go +++ /dev/null @@ -1,45 +0,0 @@ -package main - -import ( - "context" - "os" - - "dagger.io/dagger" - - "github.com/aweris/gale/common/log" - "github.com/aweris/gale/ghx/journal" -) - -func getDaggerClient(ctx context.Context) (*dagger.Client, error) { - // initialize dagger client and set it to config - var opts []dagger.ClientOpt - - journalW, journalR := journal.Pipe() - - // Just print the same logger to stdout for now. We'll replace this with something interesting later. - go logJournal(journalR) - - opts = append(opts, dagger.WithLogOutput(journalW)) - - return dagger.Connect(ctx, opts...) -} - -func logJournal(reader journal.Reader) { - cp := NewLoggingCommandProcessor() - - for { - entry, ok := reader.ReadEntry() - if !ok { - break - } - - if entry.Type == journal.EntryTypeInternal && os.Getenv("RUNNER_DEBUG") != "1" { - return - } - - // only process logging commands so it won't need context to process. It's okay to send nil context. - if err := cp.ProcessOutput(nil, entry.Message); err != nil { - log.Errorf("failed to process journal entry", "message", entry.Message, "error", err) - } - } -} diff --git a/ghx/executor_container.go b/ghx/executor_container.go index 7d215380..065ebe9f 100644 --- a/ghx/executor_container.go +++ b/ghx/executor_container.go @@ -26,7 +26,7 @@ func NewContainerExecutorFromStepDocker(sd *StepDocker) *ContainerExecutor { return &ContainerExecutor{ entrypoint: sd.Step.With["entrypoint"], args: []string{sd.Step.With["args"]}, - cp: NewEnvCommandsProcessor(), + cp: NewCommandProcessor(), container: sd.container, } } @@ -35,7 +35,7 @@ func NewContainerExecutorFromStepAction(sa *StepAction, entrypoint string) *Cont return &ContainerExecutor{ entrypoint: entrypoint, args: sa.Action.Meta.Runs.Args, - cp: NewEnvCommandsProcessor(), + cp: NewCommandProcessor(), container: sa.container, } } diff --git a/ghx/journal/entry.go b/ghx/journal/entry.go deleted file mode 100644 index 9e22e348..00000000 --- a/ghx/journal/entry.go +++ /dev/null @@ -1,93 +0,0 @@ -package journal - -import ( - "strconv" - "strings" - "time" -) - -// EntryType is the type of journal entry -type EntryType string - -const ( - // EntryTypeInternal is a log entry from dagger itself , not from the container execution. - EntryTypeInternal EntryType = "internal" - - // EntryTypeExecution is a log entry from the container execution. - EntryTypeExecution EntryType = "execution" -) - -// Entry is a single log entry from the journal -type Entry struct { - Raw string - ID int - Index int - Type EntryType - ElapsedTme time.Duration - Message string -} - -// String returns the raw log line -func (e *Entry) String() string { - return e.Raw -} - -// parseEntry parses a log line into an entry -func parseEntry(id int, line string) *Entry { - // Create a new entry with the raw log line and the ID. We'll fill in the rest later. - entry := &Entry{Raw: line, ID: id} - - var ( - index int - info string - message string - ) - - // To convert log lines into entries, we need to parse the log line into its parts. We'll do this by splitting - // the line on spaces and then parsing each part. We'll then use the parts to fill in the entry. - // - // To make this easier, we'll make some assumptions about the log format. This will make the parsing easier according - // to the following rules based on dagger log format: - // - Message format is - // - The index always an integer suffixed with : (e.g. 1:, 2:, etc.) - // - The info could be a duration or a string. If it's a duration, it will be surrounded by [] and will have - // string representation of a time.Duration (e.g. [0.217s]), otherwise it will be alphanumeric - // (e.g. exec, resolve, etc.) - // - The message is the rest of the line, which could be empty - // - The info and message are optional - // - If the info is a duration, the message is execution log, otherwise it's an internal dagger log - parts := strings.Split(line, " ") - - // TODO: assume we'll be dealing with valid log lines for now. Make this more robust later. - - index, _ = strconv.Atoi(strings.TrimSuffix(parts[0], ":")) - - if len(parts) > 1 { - info = strings.TrimSpace(parts[1]) - - if len(parts) > 2 { - message = strings.Join(parts[2:], " ") // not using TrimSpace here because we want to preserve whitespace in the message - } - } - - // if the info is a duration, then this is an execution log - if strings.HasPrefix(info, "[") && strings.HasSuffix(info, "s]") { - duration, err := time.ParseDuration(strings.TrimSuffix(strings.TrimPrefix(info, "["), "]")) - if err == nil { - entry.Index = index - entry.Type = EntryTypeExecution - entry.ElapsedTme = duration - entry.Message = message - - return entry - } - } - - // if we get here, then this is an internal log, so we'll fill in the entry accordingly. - // we're not interested further breaking down the log information for now. - entry.Index = index - entry.Type = EntryTypeInternal - entry.Message = strings.Join([]string{info, message}, " ") - - return entry -} diff --git a/ghx/journal/pipe.go b/ghx/journal/pipe.go deleted file mode 100644 index 18c78059..00000000 --- a/ghx/journal/pipe.go +++ /dev/null @@ -1,92 +0,0 @@ -package journal - -import ( - "bufio" - "io" - "strings" - "sync" - "sync/atomic" -) - -// Writer is an interface for writing to a journal. It is a combination of the io.Writer and io.Closer interfaces. -// This interface intended to be used by the dagger client to write to the journal. -type Writer interface { - io.Writer - io.Closer -} - -// Reader is an interface for reading from a journal. -type Reader interface { - ReadEntry() (*Entry, bool) -} - -var ( - _ Writer = new(unboundedPipe) - _ Reader = new(unboundedPipe) -) - -// unboundedPipe is a simple implementation of the Writer and Reader interfaces. -type unboundedPipe struct { - cond *sync.Cond - counter *atomic.Uint64 - buffer []*Entry - closed bool -} - -// Pipe creates a new Writer and Reader pair that can be used to write and read from a journal. -func Pipe() (Writer, Reader) { - pipe := &unboundedPipe{ - cond: sync.NewCond(new(sync.Mutex)), - counter: &atomic.Uint64{}, - } - return pipe, pipe -} - -func (p *unboundedPipe) Write(data []byte) (n int, err error) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - raw := string(data) - - scanner := bufio.NewScanner(strings.NewReader(raw)) - - // Loop through each line and process it - for scanner.Scan() { - line := scanner.Text() - - if line == "" { - continue - } - - p.buffer = append(p.buffer, parseEntry(int(p.counter.Add(1)), line)) - p.cond.Signal() - } - - return len(data), nil -} - -func (p *unboundedPipe) ReadEntry() (*Entry, bool) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - for len(p.buffer) == 0 && !p.closed { - p.cond.Wait() - } - - if len(p.buffer) == 0 && p.closed { - return nil, false - } - - value := p.buffer[0] - p.buffer = p.buffer[1:] - return value, true -} - -func (p *unboundedPipe) Close() error { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - p.closed = true - p.cond.Broadcast() - return nil -} diff --git a/ghx/main.go b/ghx/main.go index 491eb522..b4041ead 100644 --- a/ghx/main.go +++ b/ghx/main.go @@ -5,6 +5,8 @@ import ( "fmt" "os" + "dagger.io/dagger" + "github.com/aweris/gale/common/fs" "github.com/aweris/gale/ghx/context" ) @@ -12,7 +14,7 @@ import ( func main() { stdctx := stdContext.Background() - client, err := getDaggerClient(stdctx) + client, err := dagger.Connect(stdctx, dagger.WithLogOutput(os.Stdout)) if err != nil { fmt.Printf("failed to get dagger client: %v", err) os.Exit(1) diff --git a/ghx/workflow_commands.go b/ghx/workflow_commands.go index c5c83238..302ba995 100644 --- a/ghx/workflow_commands.go +++ b/ghx/workflow_commands.go @@ -44,30 +44,6 @@ type CommandProcessor struct { exclude map[CommandName]bool // exclude is a map of command names to exclude from processing } -// NewLoggingCommandProcessor creates a new command processor that processes logging commands. -func NewLoggingCommandProcessor() *CommandProcessor { - return NewCommandProcessor( - CommandNameSetEnv, - CommandNameSetOutput, - CommandNameSaveState, - CommandNameAddMask, - CommandNameAddMatcher, - CommandNameAddPath, - ) -} - -// NewEnvCommandsProcessor returns a new command processor that processes commands that manipulate execution environment. -func NewEnvCommandsProcessor() *CommandProcessor { - return NewCommandProcessor( - CommandNameGroup, - CommandNameEndGroup, - CommandNameDebug, - CommandNameError, - CommandNameWarning, - CommandNameNotice, - ) -} - func NewCommandProcessor(excluded ...CommandName) *CommandProcessor { exclude := make(map[CommandName]bool, len(excluded))