From e0c5cbe815914aea348ff6d6fb0cf936c687e982 Mon Sep 17 00:00:00 2001 From: Denis O Date: Tue, 13 Aug 2024 19:39:07 +0100 Subject: [PATCH] Engine output handling (#3339) * Engine output handling * Add buffering of output * Tests update --- engine/engine.go | 64 +++++++++++++++++++++------------ test/integration_engine_test.go | 2 ++ 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index e6e573e63..e95b9db09 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -448,38 +448,33 @@ func invoke(ctx context.Context, runOptions *ExecutionOptions, client *proto.Eng return nil, errors.WithStackTrace(err) } - cmdStdout := runOptions.CmdStdout - cmdStderr := runOptions.CmdStderr + var stdoutBuf, stderrBuf bytes.Buffer + stdout := io.MultiWriter(runOptions.CmdStdout, &stdoutBuf) + stderr := io.MultiWriter(runOptions.CmdStderr, &stderrBuf) - var stdoutBuf bytes.Buffer - var stderrBuf bytes.Buffer + var stdoutLineBuf, stderrLineBuf bytes.Buffer + var resultCode int - stdout := io.MultiWriter(cmdStdout, &stdoutBuf) - stderr := io.MultiWriter(cmdStderr, &stderrBuf) - // read stdout and stderr from engine - var resultCode = 0 for { runResp, err := response.Recv() - if err != nil { - break - } - if runResp == nil { + if err != nil || runResp == nil { break } - if runResp.GetStdout() != "" { - _, err := stdout.Write([]byte(runResp.GetStdout())) - if err != nil { - return nil, errors.WithStackTrace(err) - } + if err := processStream(runResp.GetStdout(), &stdoutLineBuf, stdout); err != nil { + return nil, errors.WithStackTrace(err) } - if runResp.GetStderr() != "" { - _, err := stderr.Write([]byte(runResp.GetStderr())) - if err != nil { - return nil, errors.WithStackTrace(err) - } + if err := processStream(runResp.GetStderr(), &stderrLineBuf, stderr); err != nil { + return nil, errors.WithStackTrace(err) } resultCode = int(runResp.GetResultCode()) } + if err := flushBuffer(&stdoutLineBuf, stdout); err != nil { + return nil, errors.WithStackTrace(err) + } + if err := flushBuffer(&stderrLineBuf, stderr); err != nil { + return nil, errors.WithStackTrace(err) + } + terragruntOptions.Logger.Debugf("Engine execution done in %v", terragruntOptions.WorkingDir) if resultCode != 0 { @@ -500,6 +495,31 @@ func invoke(ctx context.Context, runOptions *ExecutionOptions, client *proto.Eng return &cmdOutput, nil } +// processStream handles the character buffering and line printing for a given stream +func processStream(data string, lineBuf *bytes.Buffer, output io.Writer) error { + for _, ch := range data { + lineBuf.WriteByte(byte(ch)) + if ch == '\n' { + if _, err := fmt.Fprint(output, lineBuf.String()); err != nil { + return errors.WithStackTrace(err) + } + lineBuf.Reset() + } + } + return nil +} + +// flushBuffer prints any remaining data in the buffer +func flushBuffer(lineBuf *bytes.Buffer, output io.Writer) error { + if lineBuf.Len() > 0 { + _, err := fmt.Fprint(output, lineBuf.String()) + if err != nil { + return errors.WithStackTrace(err) + } + } + return nil +} + // initialize engine for working directory func initialize(ctx context.Context, runOptions *ExecutionOptions, client *proto.EngineClient) error { terragruntOptions := runOptions.TerragruntOptions diff --git a/test/integration_engine_test.go b/test/integration_engine_test.go index 8da9c91f4..12b962389 100644 --- a/test/integration_engine_test.go +++ b/test/integration_engine_test.go @@ -82,6 +82,8 @@ func TestEngineRunAllOpentofu(t *testing.T) { assert.Contains(t, stderr, "starting plugin:") assert.Contains(t, stderr, "plugin process exited:") + assert.Contains(t, stdout, "resource \"local_file\" \"test\"") + assert.Contains(t, stdout, "filename = \"./test.txt\"\n") assert.Contains(t, stdout, "OpenTofu has been successfull") assert.Contains(t, stdout, "Tofu Shutdown completed") assert.Contains(t, stdout, "Apply complete!")