Skip to content

Commit

Permalink
fix: output parsing test (#4196)
Browse files Browse the repository at this point in the history
* fix: successful output parsing test

* fix: check if executor log is already in json output

* fix: ignore queued

* fix: change container logs

* fixL ignore scraper result

* fix: add nil checks

* fix: add unit tests

* fix: golint

* fix: change condition

* fix: update execution result on failure
  • Loading branch information
vsukhin authored Jul 21, 2023
1 parent 350e8de commit 02d4e2d
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 4 deletions.
19 changes: 18 additions & 1 deletion pkg/executor/containerexecutor/containerexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,24 @@ func (c *ContainerExecutor) updateResultsFromPod(
}

executorLogs = append(executorLogs, scraperLogs...)
execution.ExecutionResult.Output = string(executorLogs)

// parse container output log (mixed JSON and plain text stream)
executionResult, output, err := output.ParseContainerOutput(executorLogs)
if err != nil {
l.Errorw("parse output error", "error", err)
execution.ExecutionResult.Output = output
execution.ExecutionResult.Err(err)
err = c.repository.UpdateResult(ctx, execution.Id, *execution)
if err != nil {
l.Infow("Update result", "error", err)
}
return execution.ExecutionResult, err
}

if executionResult != nil {
execution.ExecutionResult = executionResult
}
execution.ExecutionResult.Output = output

if execution.ExecutionResult.IsFailed() && execution.ExecutionResult.ErrorMessage == "" {
execution.ExecutionResult.ErrorMessage = executor.GetPodErrorMessage(latestExecutorPod)
Expand Down
125 changes: 123 additions & 2 deletions pkg/executor/output/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,40 @@ func ParseRunnerOutput(b []byte) (*testkube.ExecutionResult, error) {
return result, nil
}

// ParseContainerOutput goes over the raw logs in b and parses possible container output
// The input is a mixed stream of the json form and plain text
// runner execution started ------------
// {"type": "result", "result": {"id": "2323", "output": "-----"}, "time": "..."}
func ParseContainerOutput(b []byte) (*testkube.ExecutionResult, string, error) {
result := &testkube.ExecutionResult{}
if len(b) == 0 {
return nil, "", nil
}

logs, err := parseContainerLogs(b)
if err != nil {
err = fmt.Errorf("could not parse logs \"%s\": %v", b, err.Error())
return nil, err.Error(), err
}

output := sanitizeLogs(logs)
log := getDecidingContainerLogLine(logs)
if log == nil {
return nil, output, nil
}

switch log.Type_ {
case TypeResult:
if log.Result != nil {
result = log.Result
}
case TypeError:
result.Err(fmt.Errorf(log.Content))
}

return result, output, nil
}

// sanitizeLogs creates a human-readable string from a list of Outputs
func sanitizeLogs(logs []Output) string {
var sb strings.Builder
Expand Down Expand Up @@ -138,6 +172,52 @@ func parseLogs(b []byte) ([]Output, error) {
return logs, nil
}

// parseContainerLogs gets a list of Outputs from raw logs
func parseContainerLogs(b []byte) ([]Output, error) {
logs := []Output{}
reader := bufio.NewReader(bytes.NewReader(b))

for {
b, err := utils.ReadLongLine(reader)
if err != nil {
if err == io.EOF {
err = nil
break
}

return logs, fmt.Errorf("could not read line: %w", err)
}

log, err := GetLogEntry(b)
if log.Type_ == TypeParsingError || log.Type_ == TypeUnknown || err != nil {
// try to read in case of some lines which we couldn't parse
// sometimes we're not able to control all stdout messages from libs
logs = append(logs, Output{
Type_: TypeLogLine,
Content: string(b),
})

continue
}

if log.Type_ == TypeResult &&
log.Result != nil && log.Result.Status != nil {
message := getResultMessage(*log.Result)
logs = append(logs, Output{
Type_: TypeResult,
Content: message,
Result: log.Result,
})

continue
}

logs = append(logs, log)
}

return logs, nil
}

// getDecidingLogLine returns the log line of type result
// if there is no log line of type result it will return the last log based on time
// if there are no timestamps, it will return the last error log from the list,
Expand Down Expand Up @@ -178,6 +258,47 @@ func getDecidingLogLine(logs []Output) *Output {
return &resultLog
}

// getDecidingContainerLogLine returns the log line of type result
// if there are no timestamps, it will return the last error log from the list,
// if there are no errors, nothing is returned
func getDecidingContainerLogLine(logs []Output) *Output {
if len(logs) == 0 {
return nil
}

resultLog := Output{
Type_: TypeLogLine,
Time: time.Time{},
}

for _, log := range logs {
if log.Type_ == TypeResult &&
(log.Result == nil || log.Result.Status == nil || log.Result.IsRunning()) {
// this is the result of the init-container or scraper pod on success, let's ignore it
continue
}

if moreSevere(log.Type_, resultLog.Type_) {
resultLog = log
continue
}

if sameSeverity(log.Type_, resultLog.Type_) {
if log.Time.Before(resultLog.Time) {
continue
}

resultLog = log
}
}

if resultLog.Type_ != TypeResult && resultLog.Type_ != TypeError {
return nil
}

return &resultLog
}

// getResultMessage returns a message from the result regardless of its type
func getResultMessage(result testkube.ExecutionResult) string {
if result.IsFailed() {
Expand All @@ -187,7 +308,7 @@ func getResultMessage(result testkube.ExecutionResult) string {
return result.Output
}

return fmt.Sprintf("%v", result.Status)
return fmt.Sprintf("%s", *result.Status)
}

// sameSeverity decides if a and b are of the same severity type
Expand All @@ -210,5 +331,5 @@ func moreSevere(a string, b string) bool {
}

// a is either log or event
return !(b == TypeResult || b == TypeError)
return b != TypeResult && b != TypeError
}
Loading

0 comments on commit 02d4e2d

Please sign in to comment.