diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index 9ad2c8a0e06b..627ac576a398 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -44,6 +44,8 @@ type ExpectProcess struct { fpty *os.File wg sync.WaitGroup + readCloseCh chan struct{} // close it if async read goroutine exits + mu sync.Mutex // protects lines, count, cur, exitErr and exitCode lines []string count int // increment whenever new line gets added @@ -67,6 +69,7 @@ func NewExpectWithEnv(name string, args []string, env []string, serverProcessCon args: args, env: env, }, + readCloseCh: make(chan struct{}), } ep.cmd = commandFromConfig(ep.cfg) @@ -100,7 +103,10 @@ func (ep *ExpectProcess) Pid() int { } func (ep *ExpectProcess) read() { - defer ep.wg.Done() + defer func() { + ep.wg.Done() + close(ep.readCloseCh) + }() defer func(fpty *os.File) { err := fpty.Close() if err != nil { @@ -187,9 +193,25 @@ func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (s } } + select { + // NOTE: we wait readCloseCh for ep.read() to complete draining the log before acquring the lock. + case <-ep.readCloseCh: + case <-ctx.Done(): + return "", fmt.Errorf("failed to find match string: %w", ctx.Err()) + } + ep.mu.Lock() defer ep.mu.Unlock() + // retry it since we get all the log data + for i < len(ep.lines) { + line := ep.lines[i] + i++ + if f(line) { + return line, nil + } + } + lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL if lastLinesIndex < 0 { lastLinesIndex = 0 diff --git a/pkg/expect/expect_test.go b/pkg/expect/expect_test.go index 8b8311af2e03..2e03b0a0a442 100644 --- a/pkg/expect/expect_test.go +++ b/pkg/expect/expect_test.go @@ -209,3 +209,11 @@ func TestExitCodeAfterKill(t *testing.T) { assert.Equal(t, 137, code) assert.NoError(t, err) } + +func TestExpectForFailFastCommand(t *testing.T) { + ep, err := NewExpect("sh", "-c", `echo "curl: (59) failed setting cipher list"; exit 59`) + require.NoError(t, err) + + _, err = ep.Expect("failed setting cipher list") + require.NoError(t, err) +}