Skip to content

Commit

Permalink
Dangerous Context Enrichment
Browse files Browse the repository at this point in the history
by passing the Sentry Context down our abstraction stack.
This included changes in the complex context management of managing a Command Execution.
  • Loading branch information
mpass99 committed Feb 3, 2023
1 parent 2650efb commit 4550a45
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 37 deletions.
4 changes: 2 additions & 2 deletions internal/api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func (r *RunnerController) connectToRunner(writer http.ResponseWriter, request *
log.WithField("runnerId", targetRunner.ID()).
WithField("executionID", logging.RemoveNewlineSymbol(executionID)).
Info("Running execution")
logging.StartSpan("api.runner.connect", "Execute Interactively", request.Context(), func(_ context.Context) {
logging.StartSpan("api.runner.connect", "Execute Interactively", request.Context(), func(ctx context.Context) {
exit, cancel, err := targetRunner.ExecuteInteractively(executionID,
proxy.Input, proxy.Output.StdOut(), proxy.Output.StdErr())
proxy.Input, proxy.Output.StdOut(), proxy.Output.StdErr(), ctx)
if err != nil {
log.WithError(err).Warn("Cannot execute request.")
return // The proxy is stopped by the deferred cancel.
Expand Down
19 changes: 16 additions & 3 deletions internal/nomad/api_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"
nomadApi "github.com/hashicorp/nomad/api"
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/pkg/logging"
"io"
)

Expand Down Expand Up @@ -88,18 +89,30 @@ func (nc *nomadAPIClient) Execute(runnerID string,
ctx context.Context, command []string, tty bool,
stdin io.Reader, stdout, stderr io.Writer,
) (int, error) {
allocations, _, err := nc.client.Jobs().Allocations(runnerID, false, nil)
var allocations []*nomadApi.AllocationListStub
var err error
logging.StartSpan("nomad.execute.list", "List Allocations for id", ctx, func(_ context.Context) {
allocations, _, err = nc.client.Jobs().Allocations(runnerID, false, nil)
})
if err != nil {
return 1, fmt.Errorf("error retrieving allocations for runner: %w", err)
}
if len(allocations) == 0 {
return 1, ErrorNoAllocationFound
}
allocation, _, err := nc.client.Allocations().Info(allocations[0].ID, nil)

var allocation *nomadApi.Allocation
logging.StartSpan("nomad.execute.info", "List Data of Allocation", ctx, func(_ context.Context) {
allocation, _, err = nc.client.Allocations().Info(allocations[0].ID, nil)
})
if err != nil {
return 1, fmt.Errorf("error retrieving allocation info: %w", err)
}
exitCode, err := nc.client.Allocations().Exec(ctx, allocation, TaskName, tty, command, stdin, stdout, stderr, nil, nil)

var exitCode int
logging.StartSpan("nomad.execute.exec", "Execute Command in Allocation", ctx, func(ctx context.Context) {
exitCode, err = nc.client.Allocations().Exec(ctx, allocation, TaskName, tty, command, stdin, stdout, stderr, nil, nil)
})
switch {
case err == nil:
return exitCode, nil
Expand Down
20 changes: 13 additions & 7 deletions internal/nomad/nomad.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,22 @@ func (a *APIClient) executeCommandInteractivelyWithStderr(allocationID string, c
defer cancel()

// Catch stderr in separate execution.
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
if err != nil {
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
}
stderrExitChan <- exit
logging.StartSpan("nomad.execute.stderr", "Execution for separate StdErr", ctx, func(ctx context.Context) {
exit, err := a.Execute(allocationID, ctx, prepareCommandTTYStdErr(currentNanoTime, privilegedExecution), true,
nullio.Reader{Ctx: readingContext}, stderr, io.Discard)
if err != nil {
log.WithError(err).WithField("runner", allocationID).Warn("Stderr task finished with error")
}
stderrExitChan <- exit
})
}()

command = prepareCommandTTY(command, currentNanoTime, privilegedExecution)
exit, err := a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard)
var exit int
var err error
logging.StartSpan("nomad.execute.tty", "Interactive Execution", ctx, func(ctx context.Context) {
exit, err = a.Execute(allocationID, ctx, command, true, stdin, stdout, io.Discard)
})

// Wait until the stderr catch command finished to make sure we receive all output.
<-stderrExitChan
Expand Down
2 changes: 1 addition & 1 deletion internal/nomad/nomad_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (s *ExecuteCommandTestSuite) TestWithoutSeparateStderrReturnsCommandError()

func (s *ExecuteCommandTestSuite) mockExecute(command interface{}, exitCode int,
err error, runFunc func(arguments mock.Arguments)) *mock.Call {
return s.apiMock.On("Execute", s.allocationID, s.ctx, command, withTTY,
return s.apiMock.On("Execute", s.allocationID, mock.Anything, command, withTTY,
mock.Anything, mock.Anything, mock.Anything).
Run(runFunc).
Return(exitCode, err)
Expand Down
3 changes: 2 additions & 1 deletion internal/runner/aws_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func (w *AWSFunctionWorkload) ExecutionExists(id string) bool {
return ok
}

func (w *AWSFunctionWorkload) ExecuteInteractively(id string, _ io.ReadWriter, stdout, stderr io.Writer) (
func (w *AWSFunctionWorkload) ExecuteInteractively(
id string, _ io.ReadWriter, stdout, stderr io.Writer, _ context.Context) (
<-chan ExitInfo, context.CancelFunc, error) {
w.ResetTimeout()
request, ok := w.executions.Pop(id)
Expand Down
9 changes: 6 additions & 3 deletions internal/runner/aws_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
cancel()

r.StoreExecution(tests.DefaultEnvironmentIDAsString, &dto.ExecutionRequest{})
exit, _, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
exit, _, err := r.ExecuteInteractively(
tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard, context.Background())
require.NoError(t, err)
<-exit
assert.True(t, awsMock.hasConnected)
Expand All @@ -89,7 +90,8 @@ func TestAWSFunctionWorkload_ExecuteInteractively(t *testing.T) {
request := &dto.ExecutionRequest{Command: command}
r.StoreExecution(tests.DefaultEnvironmentIDAsString, request)

_, cancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
_, cancel, err := r.ExecuteInteractively(
tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard, context.Background())
require.NoError(t, err)
<-time.After(tests.ShortTimeout)
cancel()
Expand Down Expand Up @@ -123,7 +125,8 @@ func TestAWSFunctionWorkload_UpdateFileSystem(t *testing.T) {

err = r.UpdateFileSystem(&dto.UpdateFileSystemRequest{Copy: []dto.File{myFile}}, context.Background())
assert.NoError(t, err)
_, execCancel, err := r.ExecuteInteractively(tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard)
_, execCancel, err := r.ExecuteInteractively(
tests.DefaultEnvironmentIDAsString, nil, io.Discard, io.Discard, context.Background())
require.NoError(t, err)
<-time.After(tests.ShortTimeout)
execCancel()
Expand Down
15 changes: 11 additions & 4 deletions internal/runner/nomad_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (r *NomadJob) ExecuteInteractively(
id string,
stdin io.ReadWriter,
stdout, stderr io.Writer,
requestCtx context.Context,
) (<-chan ExitInfo, context.CancelFunc, error) {
request, ok := r.executions.Pop(id)
if !ok {
Expand All @@ -117,13 +118,19 @@ func (r *NomadJob) ExecuteInteractively(

r.ResetTimeout()

command, ctx, cancel := prepareExecution(request, r.ctx)
// We have to handle three contexts
// - requestCtx: The context of the http request (including Sentry data)
// - r.ctx: The context of the runner (runner timeout)
// - executionCtx: The context of the execution (execution timeout)
// -> The executionCtx cancel that might be triggered (when the client connection breaks)

command, executionCtx, cancel := prepareExecution(request, r.ctx)
exitInternal := make(chan ExitInfo)
exit := make(chan ExitInfo, 1)
ctxExecute, cancelExecute := context.WithCancel(r.ctx)
ctxExecute, cancelExecute := context.WithCancel(requestCtx)

go r.executeCommand(ctxExecute, command, request.PrivilegedExecution, stdin, stdout, stderr, exitInternal)
go r.handleExitOrContextDone(ctx, cancelExecute, exitInternal, exit, stdin)
go r.handleExitOrContextDone(executionCtx, cancelExecute, exitInternal, exit, stdin)

return exit, cancel, nil
}
Expand Down Expand Up @@ -166,7 +173,7 @@ func (r *NomadJob) UpdateFileSystem(copyRequest *dto.UpdateFileSystemRequest, ct
updateFileCommand := (&dto.ExecutionRequest{Command: fileDeletionCommand + copyCommand}).FullCommand()
stdOut := bytes.Buffer{}
stdErr := bytes.Buffer{}
exitCode, err := r.api.ExecuteCommand(r.id, context.Background(), updateFileCommand, false,
exitCode, err := r.api.ExecuteCommand(r.id, ctx, updateFileCommand, false,
nomad.PrivilegedExecution, // All files should be written and owned by a privileged user #211.
&tarBuffer, &stdOut, &stdErr)
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions internal/runner/nomad_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func (s *ExecuteInteractivelyTestSuite) SetupTest() {
}

func (s *ExecuteInteractivelyTestSuite) TestReturnsErrorWhenExecutionDoesNotExist() {
_, _, err := s.runner.ExecuteInteractively("non-existent-id", nil, nil, nil)
_, _, err := s.runner.ExecuteInteractively("non-existent-id", nil, nil, nil, context.Background())
s.ErrorIs(err, ErrorUnknownExecution)
}

func (s *ExecuteInteractivelyTestSuite) TestCallsApi() {
request := &dto.ExecutionRequest{Command: "echo 'Hello World!'"}
s.runner.StoreExecution(defaultExecutionID, request)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, nil, nil, nil)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, nil, nil, nil, context.Background())
s.Require().NoError(err)

time.Sleep(tests.ShortTimeout)
Expand All @@ -155,7 +155,7 @@ func (s *ExecuteInteractivelyTestSuite) TestReturnsAfterTimeout() {
timeLimit := 1
executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit}
s.runner.StoreExecution(defaultExecutionID, executionRequest)
exit, _, err := s.runner.ExecuteInteractively(defaultExecutionID, &nullio.ReadWriter{}, nil, nil)
exit, _, err := s.runner.ExecuteInteractively(defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background())
s.Require().NoError(err)

select {
Expand Down Expand Up @@ -191,7 +191,8 @@ func (s *ExecuteInteractivelyTestSuite) TestSendsSignalAfterTimeout() {
timeLimit := 1
executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit}
s.runner.StoreExecution(defaultExecutionID, executionRequest)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil)
_, _, err := s.runner.ExecuteInteractively(
defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil, context.Background())
s.Require().NoError(err)
log.Info("Before waiting")
select {
Expand All @@ -210,7 +211,8 @@ func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal(
executionRequest := &dto.ExecutionRequest{TimeLimit: timeLimit}
s.runner.cancel = func() {}
s.runner.StoreExecution(defaultExecutionID, executionRequest)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil)
_, _, err := s.runner.ExecuteInteractively(
defaultExecutionID, bytes.NewBuffer(make([]byte, 1)), nil, nil, context.Background())
s.Require().NoError(err)
<-time.After(executionTimeoutGracePeriod + time.Duration(timeLimit)*time.Second + tests.ShortTimeout)
s.manager.AssertCalled(s.T(), "Return", s.runner)
Expand All @@ -219,7 +221,7 @@ func (s *ExecuteInteractivelyTestSuite) TestDestroysRunnerAfterTimeoutAndSignal(
func (s *ExecuteInteractivelyTestSuite) TestResetTimerGetsCalled() {
executionRequest := &dto.ExecutionRequest{}
s.runner.StoreExecution(defaultExecutionID, executionRequest)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, nil, nil, nil)
_, _, err := s.runner.ExecuteInteractively(defaultExecutionID, nil, nil, nil, context.Background())
s.Require().NoError(err)
s.timer.AssertCalled(s.T(), "ResetTimeout")
}
Expand All @@ -228,7 +230,8 @@ func (s *ExecuteInteractivelyTestSuite) TestExitHasTimeoutErrorIfRunnerTimesOut(
s.mockedTimeoutPassedCall.Return(true)
executionRequest := &dto.ExecutionRequest{}
s.runner.StoreExecution(defaultExecutionID, executionRequest)
exitChannel, _, err := s.runner.ExecuteInteractively(defaultExecutionID, &nullio.ReadWriter{}, nil, nil)
exitChannel, _, err := s.runner.ExecuteInteractively(
defaultExecutionID, &nullio.ReadWriter{}, nil, nil, context.Background())
s.Require().NoError(err)
exit := <-exitChannel
s.Equal(ErrorRunnerInactivityTimeout, exit.Err)
Expand Down
1 change: 1 addition & 0 deletions internal/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Runner interface {
stdin io.ReadWriter,
stdout,
stderr io.Writer,
ctx context.Context,
) (exit <-chan ExitInfo, cancel context.CancelFunc, err error)

// ListFileSystem streams the listing of the file system of the requested directory into the Writer provided.
Expand Down
18 changes: 9 additions & 9 deletions internal/runner/runner_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4550a45

Please sign in to comment.