Skip to content

Commit

Permalink
Merge pull request #178 from ulucinar/scheduler
Browse files Browse the repository at this point in the history
Add terraform.ProviderScheduler
  • Loading branch information
ulucinar authored Mar 27, 2023
2 parents acc1fd6 + 76d344c commit 05c3d62
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 106 deletions.
71 changes: 63 additions & 8 deletions pkg/controller/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/logging"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
xpresource "github.com/crossplane/crossplane-runtime/pkg/resource"
"github.com/pkg/errors"
Expand All @@ -32,6 +33,7 @@ const (
errApply = "cannot apply"
errDestroy = "cannot destroy"
errStatusUpdate = "cannot update status of custom resource"
errScheduleProvider = "cannot schedule native Terraform provider process"
)

// Option allows you to configure Connector.
Expand All @@ -46,13 +48,21 @@ func WithCallbackProvider(ac CallbackProvider) Option {
}
}

// WithLogger configures a logger for the Connector.
func WithLogger(l logging.Logger) Option {
return func(c *Connector) {
c.logger = l
}
}

// NewConnector returns a new Connector object.
func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector {
c := &Connector{
kube: kube,
getTerraformSetup: sf,
store: ws,
config: cfg,
logger: logging.NewNopLogger(),
}
for _, f := range opts {
f(c)
Expand All @@ -68,6 +78,7 @@ type Connector struct {
getTerraformSetup terraform.SetupFn
config *config.Resource
callback CallbackProvider
logger logging.Logger
}

// Connect makes sure the underlying client is ready to issue requests to the
Expand All @@ -83,29 +94,61 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed
return nil, errors.Wrap(err, errGetTerraformSetup)
}

tf, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config)
if err != nil {
return nil, errors.Wrap(err, errGetWorkspace)
}

return &external{
workspace: tf,
config: c.config,
callback: c.callback,
workspace: ws,
config: c.config,
callback: c.callback,
providerScheduler: ts.Scheduler,
providerHandle: ws.ProviderHandle,
logger: c.logger.WithValues("uid", mg.GetUID()),
}, nil
}

type external struct {
workspace Workspace
config *config.Resource
callback CallbackProvider
workspace Workspace
config *config.Resource
callback CallbackProvider
providerScheduler terraform.ProviderScheduler
providerHandle terraform.ProviderHandle
logger logging.Logger
}

func (e *external) scheduleProvider() error {
if e.providerScheduler == nil || e.workspace == nil {
return nil
}
inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle)
if err != nil {
return errors.Wrap(err, errScheduleProvider)
}
if ps, ok := e.workspace.(ProviderSharer); ok {
ps.UseProvider(inuse, attachmentConfig)
}
return nil
}

func (e *external) stopProvider() {
if e.providerScheduler == nil {
return
}
if err := e.providerScheduler.Stop(e.providerHandle); err != nil {
e.logger.Info("ExternalClient failed to stop the native provider", "error", err)
}
}

func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo
// We skip the gocyclo check because most of the operations are straight-forward
// and serial.
// TODO(muvaf): Look for ways to reduce the cyclomatic complexity without
// increasing the difficulty of understanding the flow.
if err := e.scheduleProvider(); err != nil {
return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID())
}
defer e.stopProvider()
tr, ok := mg.(resource.Terraformed)
if !ok {
return managed.ExternalObservation{}, errors.New(errUnexpectedObject)
Expand Down Expand Up @@ -220,6 +263,10 @@ func addTTR(mg xpresource.Managed) {
}

func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand Down Expand Up @@ -247,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) {
if err := e.scheduleProvider(); err != nil {
return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply)
}
Expand All @@ -266,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E
}

func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error {
if err := e.scheduleProvider(); err != nil {
return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID())
}
defer e.stopProvider()
if e.config.UseAsync {
return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/upbound/upjet/pkg/terraform"
)

const (
testPath = "test/path"
)

var (
errBoom = errors.New("boom")
exampleState = &json.StateV4{
Expand Down Expand Up @@ -154,7 +158,7 @@ func TestConnect(t *testing.T) {
},
store: StoreFns{
WorkspaceFn: func(_ context.Context, _ resource.SecretClient, _ resource.Terraformed, _ terraform.Setup, _ *config.Resource) (*terraform.Workspace, error) {
return nil, nil
return terraform.NewWorkspace(testPath), nil
},
},
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type Workspace interface {
Plan(context.Context) (terraform.PlanResult, error)
}

// ProviderSharer shares a native provider process with the receiver.
type ProviderSharer interface {
UseProvider(inuse terraform.InUse, attachmentConfig string)
}

// Store is where we can get access to the Terraform workspace of given resource.
type Store interface {
Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error)
Expand Down
22 changes: 0 additions & 22 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,6 @@ const (
promSysResource = "resource"
)

// ExecMode is the Terraform CLI execution mode label
type ExecMode int

const (
// ModeSync represents the synchronous execution mode
ModeSync ExecMode = iota
// ModeASync represents the asynchronous execution mode
ModeASync
)

// String converts an execMode to string
func (em ExecMode) String() string {
switch em {
case ModeSync:
return "sync"
case ModeASync:
return "async"
default:
return "unknown"
}
}

var (
// CLITime is the Terraform CLI execution times histogram.
CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/templates/controller.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error {
}
r := managed.NewReconciler(mgr,
xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind),
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"],
managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger),
{{- if .UseAsync }}
tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))),
{{- end}}
Expand Down
10 changes: 7 additions & 3 deletions pkg/terraform/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type FileProducer struct {

// WriteMainTF writes the content main configuration file that has the desired
// state configuration for Terraform.
func (fp *FileProducer) WriteMainTF() error {
func (fp *FileProducer) WriteMainTF() (ProviderHandle, error) {
// If the resource is in a deletion process, we need to remove the deletion
// protection.
fp.parameters["lifecycle"] = map[string]bool{
Expand Down Expand Up @@ -129,9 +129,13 @@ func (fp *FileProducer) WriteMainTF() error {
}
rawMainTF, err := json.JSParser.Marshal(m)
if err != nil {
return errors.Wrap(err, "cannot marshal main hcl object")
return InvalidProviderHandle, errors.Wrap(err, "cannot marshal main hcl object")
}
return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile)
h, err := fp.Setup.Configuration.ToProviderHandle()
if err != nil {
return InvalidProviderHandle, errors.Wrap(err, "cannot get scheduler handle")
}
return h, errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile)
}

// EnsureTFState writes the Terraform state that should exist in the filesystem
Expand Down
2 changes: 1 addition & 1 deletion pkg/terraform/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestWriteMainTF(t *testing.T) {
if err != nil {
t.Errorf("cannot initialize a file producer: %s", err.Error())
}
err = fp.WriteMainTF()
_, err = fp.WriteMainTF()
if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("\n%s\nWriteMainTF(...): -want error, +got error:\n%s", tc.reason, diff)
}
Expand Down
Loading

0 comments on commit 05c3d62

Please sign in to comment.