Skip to content

Commit

Permalink
Call InUse.Increment from reconciliation goroutine
Browse files Browse the repository at this point in the history
Signed-off-by: Alper Rifat Ulucinar <[email protected]>
  • Loading branch information
ulucinar committed Mar 27, 2023
1 parent 967a3a5 commit 616ea71
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 71 deletions.
83 changes: 50 additions & 33 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
Expand Down Expand Up @@ -47,11 +48,10 @@ func WithCallbackProvider(ac CallbackProvider) Option {
}
}

// WithProviderScheduler sets the native Terraform provider scheduler to be used
// by a Connector.
func WithProviderScheduler(s terraform.ProviderScheduler) Option {
// WithLogger configures a logger for the Connector.
func WithLogger(l logging.Logger) Option {
return func(c *Connector) {
c.providerScheduler = s
c.logger = l
}
}

Expand All @@ -62,7 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi
getTerraformSetup: sf,
store: ws,
config: cfg,
providerScheduler: terraform.NewNoOpProviderScheduler(),
logger: logging.NewNopLogger(),
}
for _, f := range opts {
f(c)
Expand All @@ -78,8 +78,7 @@ type Connector struct {
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

// Connect makes sure the underlying client is ready to issue requests to the
Expand All @@ -94,56 +93,62 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
if err != nil {
return nil, errors.Wrap(err, errGetTerraformSetup)
}
if ts.Scheduler != nil {
c.providerScheduler = ts.Scheduler
}

ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
if err != nil {
return nil, errors.Wrap(err, errGetWorkspace)
}
if err := c.scheduleProvider(ws); err != nil {
return nil, errors.Wrap(err, errScheduleProvider)
}
return &external{
workspace: ws,
config: c.config,
callback: c.callback,
workspace: ws,
config: c.config,
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
logger: c.logger.WithValues("uid", mg.GetUID()),
}, nil
}

func (c *Connector) scheduleProvider(ws *terraform.Workspace) error {
if c.providerScheduler == nil || ws == nil {
type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

func (e *external) scheduleProvider() error {
if e.providerScheduler == nil || e.workspace == nil {
return nil
}
inuse, attachmentConfig, err := c.providerScheduler.Start(ws.ProviderHandle)
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
return errors.Wrap(err, "cannot schedule a shared provider for the workspace")
return errors.Wrap(err, errScheduleProvider)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
ws.UseSharedProvider(inuse, attachmentConfig)
c.providerHandle = ws.ProviderHandle
return nil
}

// Disconnect releases any resources held by the Connector.
func (c *Connector) Disconnect(_ context.Context) error {
if c.providerScheduler == nil {
return nil
func (e *external) stopProvider() {
if e.providerScheduler == nil {
return
}
if err := e.providerScheduler.Stop(e.providerHandle); err != nil {
e.logger.Info("ExternalClient failed to stop the native provider", "error", err)
}
return errors.Wrap(c.providerScheduler.Stop(c.providerHandle), "cannot stop the shared provider for the workspace")
}

type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
}

func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo
// We skip the gocyclo check because most of the operations are straight-forward
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
if err := e.scheduleProvider(); err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
defer e.stopProvider()
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
Expand Down Expand Up @@ -258,6 +263,10 @@ func addTTR(mg xpresource.Managed) {
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand Down Expand Up @@ -285,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand All @@ -304,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
if err := e.scheduleProvider(); err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Workspace interface {
Plan(context.Context) (terraform.PlanResult, error)
}

// ProviderSharer shares a native provider process with the receiver.
type ProviderSharer interface {
UseProvider(inuse terraform.InUse, attachmentConfig string)
}

// Store is where we can get access to the Terraform workspace of given resource.
type Store interface {
Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error)
Expand Down
22 changes: 0 additions & 22 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,6 @@ const (
promSysResource = "resource"
)

// ExecMode is the Terraform CLI execution mode label
type ExecMode int

const (
// ModeSync represents the synchronous execution mode
ModeSync ExecMode = iota
// ModeASync represents the asynchronous execution mode
ModeASync
)

// String converts an execMode to string
func (em ExecMode) String() string {
switch em {
case ModeSync:
return "sync"
case ModeASync:
return "async"
default:
return "unknown"
}
}

var (
// CLITime is the Terraform CLI execution times histogram.
CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
}
r := managed.NewReconciler(mgr,
xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind),
managed.WithExternalConnectDisconnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"],
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))),
{{- end}}
Expand Down
4 changes: 4 additions & 0 deletions pkg/terraform/provider_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig)
return sr.reattachConfig, nil
}
log.Debug("Provider runner not yet started. Will fork a new native provider.")
errCh := make(chan error, 1)
reattachCh := make(chan string, 1)
sr.stopCh = make(chan bool, 1)
Expand All @@ -189,6 +190,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
errCh <- err
return
}
log.Debug("Forked new native provider.")
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
t := scanner.Text()
Expand All @@ -211,6 +213,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
errCh <- err
case <-sr.stopCh:
cmd.Stop()
log.Debug("Stopped the provider runner.")
}
}()

Expand All @@ -229,6 +232,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo
func (sr *SharedProvider) Stop() error {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.logger.Debug("Attempting to stop the provider runner.")
if sr.stopCh == nil {
return errors.New("shared provider process not started yet")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/terraform/provider_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOptio
}

func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) {
s.logger.Debug("Starting workspace scoped shared provider runner.", "handle", h)
s.logger.Debug("Starting workspace scoped provider runner.", "handle", h)
reattachConfig, err := s.runner.Start()
return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner")
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/terraform/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func getSortedKeyValuePairs(parent string, m map[string]any) []string {
cArr = append(cArr, getSortedKeyValuePairs(fmt.Sprintf("%s%s[%d].", parent, k, i), e)...)
}
result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(cArr, ",")))
case *string:
if t != nil {
result = append(result, fmt.Sprintf("%q:%q", parent+k, *t))
}
default:
result = append(result, fmt.Sprintf("%q:%q", parent+k, t))
}
Expand Down Expand Up @@ -240,7 +244,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient
return nil, errors.Wrap(err, "cannot write main tf file")
}
if isNeedProviderUpgrade {
out, err := w.runTF(ctx, metrics.ModeSync, "init", "-upgrade", "-input=false")
out, err := w.runTF(ctx, ModeSync, "init", "-upgrade", "-input=false")
w.logger.Debug("init -upgrade ended", "out", ts.filterSensitiveInformation(string(out)))
if err != nil {
return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out)))
Expand All @@ -257,7 +261,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient
if !os.IsNotExist(err) {
return w, nil
}
out, err := w.runTF(ctx, metrics.ModeSync, "init", "-input=false")
out, err := w.runTF(ctx, ModeSync, "init", "-input=false")
w.logger.Debug("init ended", "out", ts.filterSensitiveInformation(string(out)))
return w, errors.Wrapf(err, "cannot init workspace: %s", ts.filterSensitiveInformation(string(out)))
}
Expand All @@ -279,7 +283,7 @@ func (ws *WorkspaceStore) Remove(obj xpresource.Object) error {
}

func (ws *WorkspaceStore) initMetrics() {
for _, mode := range []metrics.ExecMode{metrics.ModeSync, metrics.ModeASync} {
for _, mode := range []ExecMode{ModeSync, ModeASync} {
for _, subcommand := range []string{"init", "apply", "destroy", "plan"} {
metrics.CLIExecutions.WithLabelValues(subcommand, mode.String()).Set(0)
}
Expand Down
Loading

0 comments on commit 616ea71

Please sign in to comment.