Skip to content

Commit

Permalink
Add manager/queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
jveski committed Nov 20, 2023
1 parent d106616 commit b583600
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 2 deletions.
14 changes: 14 additions & 0 deletions internal/reconstitution/api.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
package reconstitution

import (
"context"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"

apiv1 "github.com/Azure/eno/api/v1"
)

// Reconciler is implemented by types that can reconcile individual, reconstituted resources.
type Reconciler interface {
Name() string
Reconcile(ctx context.Context, req *Request) (ctrl.Result, error)
}

// Client provides read/write access to a collection of reconstituted resources.
type Client interface {
Get(ctx context.Context, ref *ResourceRef, gen int64) (*Resource, bool)
PatchStatusAsync(ctx context.Context, req *ManifestRef, patchFn StatusPatchFn)
}

type StatusPatchFn func(*apiv1.ResourceState) bool

// ManifestRef references a particular resource manifest within a resource slice.
Expand Down
55 changes: 55 additions & 0 deletions internal/reconstitution/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package reconstitution

import (
"time"

"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
)

// New creates a new Manager, which is responsible for "reconstituting" resources
// i.e. allowing controllers to treat them as individual resources instead of their storage representation (ResourceSlice).
func New(mgr ctrl.Manager, writeBatchInterval time.Duration) (Manager, error) {
m := &reconcilerManager{
Manager: mgr,
}
m.writeBuffer = newWriteBuffer(mgr.GetClient(), writeBatchInterval, 1)
mgr.Add(m.writeBuffer)

var err error
m.reconstituter, err = newReconstituter(mgr)
if err != nil {
return nil, err
}

return m, nil
}

type Manager interface {
GetClient() Client
Add(rec Reconciler) error
}

type reconcilerManager struct {
ctrl.Manager
*reconstituter
*writeBuffer
}

func (m *reconcilerManager) GetClient() Client { return m }

func (m *reconcilerManager) Add(rec Reconciler) error {
rateLimiter := workqueue.DefaultControllerRateLimiter()
queue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
Name: rec.Name(),
})
qp := &queueProcessor{
Client: m.Manager.GetClient(),
Queue: queue,
Recon: m.reconstituter,
Handler: rec,
Logger: m.Manager.GetLogger().WithValues("controller", rec.Name()),
}
m.reconstituter.AddQueue(queue)
return m.Manager.Add(qp)
}
54 changes: 54 additions & 0 deletions internal/reconstitution/queueprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package reconstitution

import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type queueProcessor struct {
Client client.Client
Queue workqueue.RateLimitingInterface
Recon *reconstituter
Handler Reconciler
Logger logr.Logger
}

func (q *queueProcessor) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
q.Queue.ShutDown()
}()
for q.processQueueItem(ctx) {
}
return nil
}

func (q *queueProcessor) processQueueItem(ctx context.Context) bool {
item, shutdown := q.Queue.Get()
if shutdown {
return false
}
defer q.Queue.Done(item)

req := item.(*Request)
logger := q.Logger.WithValues("compositionName", req.Composition.Name, "compositionNamespace", req.Composition.Namespace, "resourceKind", req.ResourceRef.Kind, "resourceName", req.ResourceRef.Name, "resourceNamespace", req.ResourceRef.Namespace)
ctx = logr.NewContext(ctx, logger)

result, err := q.Handler.Reconcile(ctx, req)
if err != nil {
q.Queue.AddRateLimited(item)
logger.Error(err, "error while processing queue item")
return true
}
if result.RequeueAfter != 0 {
q.Queue.Forget(item)
q.Queue.AddAfter(item, result.RequeueAfter)
return true
}

q.Queue.Forget(item)
return true
}
2 changes: 0 additions & 2 deletions internal/reconstitution/reconstituter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/go-logr/logr"
)

const indexName = ".metadata.owner"

// reconstituter reconstitutes individual resources out of resource slices.
// Similar to an informer but with extra logic to handle expanding the slice resources.
type reconstituter struct {
Expand Down

0 comments on commit b583600

Please sign in to comment.