Skip to content

Commit

Permalink
feat(TKC-2844): add credentials support (#6045)
Browse files Browse the repository at this point in the history
* fix: reuse cloud connection for all operations in Toolkit
* chore: isolate Init Process constants from state implementation
* fix: error message for `secret()` function
* chore: clean up dependencies to reduce binary size
* feat(TKC-2844): add support for the `credential` function in expressions
* feat(TKC-2844): add execution ID to credential request
* feat(TKC-2844): handle fetching the credentials
* feat(TKC-2844): allow any size of credentials
* fix(TKC-2844): pass execution ID for the credentials request
* feat(TKC-2844): obfuscate credentials in Test Workflows
* chore: rename agent/client imports to agentclient
  • Loading branch information
rangoo94 authored Nov 22, 2024
1 parent 686622a commit 8266e56
Show file tree
Hide file tree
Showing 47 changed files with 794 additions and 355 deletions.
3 changes: 2 additions & 1 deletion cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kubeshop/testkube/cmd/api-server/commons"
"github.com/kubeshop/testkube/cmd/api-server/services"
"github.com/kubeshop/testkube/internal/app/api/debug"
agentclient "github.com/kubeshop/testkube/pkg/agent/client"
cloudartifacts "github.com/kubeshop/testkube/pkg/cloud/data/artifact"
cloudtestworkflow "github.com/kubeshop/testkube/pkg/cloud/data/testworkflow"
"github.com/kubeshop/testkube/pkg/event/kind/cdevent"
Expand Down Expand Up @@ -134,7 +135,7 @@ func main() {
if strings.HasPrefix(controlPlaneUrl, fmt.Sprintf("%s:%d", cfg.APIServerFullname, cfg.GRPCServerPort)) {
controlPlaneUrl = fmt.Sprintf("127.0.0.1:%d", cfg.GRPCServerPort)
}
grpcConn, err = agent.NewGRPCConnection(
grpcConn, err = agentclient.NewGRPCConnection(
ctx,
cfg.TestkubeProTLSInsecure,
cfg.TestkubeProSkipVerify,
Expand Down
5 changes: 2 additions & 3 deletions cmd/logs-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"errors"

"os"
"os/signal"
"syscall"
Expand All @@ -14,7 +13,7 @@ import (
"google.golang.org/grpc/credentials"

"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/agent"
agentclient "github.com/kubeshop/testkube/pkg/agent/client"
"github.com/kubeshop/testkube/pkg/event/bus"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/logs"
Expand Down Expand Up @@ -106,7 +105,7 @@ func main() {
switch mode {

case common.ModeAgent:
grpcConn, err := agent.NewGRPCConnection(
grpcConn, err := agentclient.NewGRPCConnection(
ctx,
cfg.TestkubeProTLSInsecure,
cfg.TestkubeProSkipVerify,
Expand Down
17 changes: 9 additions & 8 deletions cmd/testworkflow-init/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/orchestration"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/cmd/testworkflow-init/runtime"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite"
)

func Run(run lite.ActionExecute, container lite.LiteActionContainer) {
machine := data.GetInternalTestWorkflowMachine()
machine := runtime.GetInternalTestWorkflowMachine()
state := data.GetState()
step := state.GetStep(run.Ref)

Expand All @@ -32,14 +33,14 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) {

// Ensure the command is not empty
if len(command) == 0 {
output.ExitErrorf(data.CodeInputError, "command is required")
output.ExitErrorf(constants.CodeInputError, "command is required")
}

// Resolve the command to run
for i := range command {
value, err := expressions.CompileAndResolveTemplate(command[i], machine, expressions.FinalizerFail)
if err != nil {
output.ExitErrorf(data.CodeInternal, "failed to compute argument '%d': %s", i, err.Error())
output.ExitErrorf(constants.CodeInternal, "failed to compute argument '%d': %s", i, err.Error())
}
command[i], _ = value.Static().StringValue()
}
Expand All @@ -48,11 +49,11 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) {
execution := orchestration.Executions.Create(command[0], command[1:])
result, err := execution.Run()
if err != nil {
output.ExitErrorf(data.CodeInternal, "failed to execute: %v", err)
output.ExitErrorf(constants.CodeInternal, "failed to execute: %v", err)
}

// Initialize local state
var status data.StepStatus
var status constants.StepStatus

success := result.ExitCode == 0

Expand All @@ -61,11 +62,11 @@ func Run(run lite.ActionExecute, container lite.LiteActionContainer) {
success = !success
}
if result.Aborted {
status = data.StepStatusAborted
status = constants.StepStatusAborted
} else if success {
status = data.StepStatusPassed
status = constants.StepStatusPassed
} else {
status = data.StepStatusFailed
status = constants.StepStatusFailed
}

// Abandon saving execution data if the step has been finished before
Expand Down
9 changes: 4 additions & 5 deletions cmd/testworkflow-init/commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite"
"github.com/kubeshop/testkube/pkg/version"
Expand All @@ -24,7 +23,7 @@ func Setup(config lite.ActionSetup) error {
// Copy the init process
stdoutUnsafe.Print("Configuring init process...")
if config.CopyInit {
err := exec.Command("cp", "/init", data.InitPath).Run()
err := exec.Command("cp", "/init", constants.InitPath).Run()
if err != nil {
stdoutUnsafe.Error(" error\n")
stdoutUnsafe.Errorf(" failed to copy the /init process: %s\n", err.Error())
Expand All @@ -38,7 +37,7 @@ func Setup(config lite.ActionSetup) error {
// Copy the toolkit
stdoutUnsafe.Print("Configuring toolkit...")
if config.CopyToolkit {
err := exec.Command("cp", "/toolkit", data.ToolkitPath).Run()
err := exec.Command("cp", "/toolkit", constants.ToolkitPath).Run()
if err != nil {
stdoutUnsafe.Error(" error\n")
stdoutUnsafe.Errorf(" failed to copy the /toolkit utilities: %s\n", err.Error())
Expand All @@ -54,7 +53,7 @@ func Setup(config lite.ActionSetup) error {
if config.CopyBinaries {
// Use `cp` on the whole directory, as it has plenty of files, which lead to the same FS block.
// Copying individual files will lead to high FS usage
err := exec.Command("cp", "-rf", defaultInitImageBusyboxBinaryPath, data.InternalBinPath).Run()
err := exec.Command("cp", "-rf", defaultInitImageBusyboxBinaryPath, constants.InternalBinPath).Run()
if err != nil {
stdoutUnsafe.Error(" error\n")
stdoutUnsafe.Errorf(" failed to copy the binaries: %s\n", err.Error())
Expand All @@ -66,7 +65,7 @@ func Setup(config lite.ActionSetup) error {
}

// Expose debugging Pod information
stdoutUnsafe.Output(data.InitStepName, "pod", map[string]string{
stdoutUnsafe.Output(constants.InitStepName, "pod", map[string]string{
"name": os.Getenv(constants.EnvPodName),
"nodeName": os.Getenv(constants.EnvNodeName),
"namespace": os.Getenv(constants.EnvNamespaceName),
Expand Down
7 changes: 7 additions & 0 deletions cmd/testworkflow-init/constants/codes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

const (
CodeAborted uint8 = 137
CodeInputError uint8 = 155
CodeInternal uint8 = 190
)
5 changes: 5 additions & 0 deletions cmd/testworkflow-init/constants/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package constants

const (
InitStepName = "tktw-init"
)
15 changes: 15 additions & 0 deletions cmd/testworkflow-init/constants/paths.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package constants

import "path/filepath"

const (
InternalPath = "/.tktw"
TerminationLogPath = "/dev/termination-log"
)

var (
InternalBinPath = filepath.Join(InternalPath, "bin")
InitPath = filepath.Join(InternalPath, "init")
ToolkitPath = filepath.Join(InternalPath, "toolkit")
StatePath = filepath.Join(InternalPath, "state")
)
Original file line number Diff line number Diff line change
@@ -1,19 +1,4 @@
package data

import "path/filepath"

const (
InitStepName = "tktw-init"
InternalPath = "/.tktw"
TerminationLogPath = "/dev/termination-log"
)

var (
InternalBinPath = filepath.Join(InternalPath, "bin")
InitPath = filepath.Join(InternalPath, "init")
ToolkitPath = filepath.Join(InternalPath, "toolkit")
StatePath = filepath.Join(InternalPath, "state")
)
package constants

type StepStatus string

Expand Down Expand Up @@ -47,9 +32,3 @@ func StepStatusFromCode(code string) StepStatus {
}
return StepStatusAborted
}

const (
CodeAborted uint8 = 137
CodeInputError uint8 = 155
CodeInternal uint8 = 190
)
39 changes: 39 additions & 0 deletions cmd/testworkflow-init/data/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package data

import (
"context"
"sync"

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
agentclient "github.com/kubeshop/testkube/pkg/agent/client"
"github.com/kubeshop/testkube/pkg/cloud"
"github.com/kubeshop/testkube/pkg/credentials"
"github.com/kubeshop/testkube/pkg/log"
)

var (
cloudMu sync.Mutex
cloudClient cloud.TestKubeCloudAPIClient
)

func CloudClient() cloud.TestKubeCloudAPIClient {
cloudMu.Lock()
defer cloudMu.Unlock()

if cloudClient == nil {
cfg := GetState().InternalConfig.Worker.Connection
logger := log.NewSilent()
grpcConn, err := agentclient.NewGRPCConnection(context.Background(), cfg.TlsInsecure, cfg.SkipVerify, cfg.Url, "", "", "", logger)
if err != nil {
output.ExitErrorf(constants.CodeInternal, "failed to connect with the Control Plane: %s", err.Error())
}
cloudClient = cloud.NewTestKubeCloudAPIClient(grpcConn)
}
return cloudClient
}

func Credentials() credentials.CredentialRepository {
cfg := GetState().InternalConfig
return credentials.NewCredentialRepository(CloudClient(), cfg.Worker.Connection.ApiKey, cfg.Execution.Id)
}
11 changes: 6 additions & 5 deletions cmd/testworkflow-init/data/expressions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strings"

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/pkg/expressions"
)
Expand Down Expand Up @@ -68,12 +69,12 @@ var StateMachine = expressions.NewMachine().
currentStatus := GetState().CurrentStatus
expr, err := expressions.EvalExpression(currentStatus, RefNotFailedMachine, AliasMachine)
if err != nil {
output.ExitErrorf(CodeInternal, "current status is invalid: %s: %v\n", currentStatus, err.Error())
output.ExitErrorf(constants.CodeInternal, "current status is invalid: %s: %v\n", currentStatus, err.Error())
}
if passed, _ := expr.BoolValue(); passed {
return string(StepStatusPassed), true
return string(constants.StepStatusPassed), true
}
return string(StepStatusFailed), true
return string(constants.StepStatusFailed), true
} else if name == "self.status" {
state := GetState()
step := state.GetStep(state.CurrentRef)
Expand Down Expand Up @@ -123,7 +124,7 @@ var RefSuccessMachine = expressions.NewMachine().
if s.Status == nil {
return nil, false
}
return *s.Status == StepStatusPassed || *s.Status == StepStatusSkipped, true
return *s.Status == constants.StepStatusPassed || *s.Status == constants.StepStatusSkipped, true
})

var RefNotFailedMachine = expressions.NewMachine().
Expand All @@ -135,7 +136,7 @@ var RefNotFailedMachine = expressions.NewMachine().
return exp, true
}
}
return s.Status == nil || *s.Status == StepStatusPassed || *s.Status == StepStatusSkipped, true
return s.Status == nil || *s.Status == constants.StepStatusPassed || *s.Status == constants.StepStatusSkipped, true
})

func Expression(expr string, m ...expressions.Machine) (expressions.StaticValue, error) {
Expand Down
4 changes: 0 additions & 4 deletions cmd/testworkflow-init/data/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,3 @@ func GetBaseTestWorkflowMachine() expressions.Machine {
GetState() // load state
return expressions.CombinedMachines(EnvMachine, StateMachine, fileMachine)
}

func GetInternalTestWorkflowMachine() expressions.Machine {
return expressions.CombinedMachines(RefSuccessMachine, AliasMachine, GetBaseTestWorkflowMachine())
}
13 changes: 7 additions & 6 deletions cmd/testworkflow-init/data/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"sync"

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig"
Expand Down Expand Up @@ -148,14 +149,14 @@ func persistTerminationLog() {
ref = *actions[i].End
}
if actions[i].Type() == lite.ActionTypeSetup {
ref = InitStepName
ref = constants.InitStepName
}
if ref == "" {
continue
}
step := s.GetStep(ref)
if step.Status == nil {
statuses = append(statuses, fmt.Sprintf("%s,%d", StepStatusAborted, CodeAborted))
statuses = append(statuses, fmt.Sprintf("%s,%d", constants.StepStatusAborted, constants.CodeAborted))
} else {
statuses = append(statuses, fmt.Sprintf("%s,%d", (*step.Status).Code(), step.ExitCode))
}
Expand All @@ -168,9 +169,9 @@ func persistTerminationLog() {
prevTerminationLog = statuses

// Write the termination log
err := os.WriteFile(TerminationLogPath, []byte(strings.Join(statuses, "/")), 0)
err := os.WriteFile(constants.TerminationLogPath, []byte(strings.Join(statuses, "/")), 0)
if err != nil {
output.UnsafeExitErrorf(CodeInternal, "failed to save the termination log: %s", err.Error())
output.UnsafeExitErrorf(constants.CodeInternal, "failed to save the termination log: %s", err.Error())
}
}

Expand All @@ -181,7 +182,7 @@ func GetState() *state {
defer loadStateMu.Unlock()
loadStateMu.Lock()
if !loadedState {
readState(StatePath)
readState(constants.StatePath)
loadedState = true
}
return currentState
Expand All @@ -192,6 +193,6 @@ func SaveTerminationLog() {
}

func SaveState() {
persistState(StatePath)
persistState(constants.StatePath)
persistTerminationLog()
}
Loading

0 comments on commit 8266e56

Please sign in to comment.