Skip to content

Commit

Permalink
added support for interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
facchettos committed Apr 8, 2024
1 parent 0b9bcfc commit b2df0db
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ concurrency:
cancel-in-progress: true

env:
VCLUSTER_VERSION: v0.20.0-alpha.2
VCLUSTER_VERSION: v0.20.0-alpha.3
VCLUSTER_SUFFIX: vcluster
VCLUSTER_NAME: vcluster
VCLUSTER_NAMESPACE: vcluster
Expand Down
10 changes: 10 additions & 0 deletions e2e/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,14 @@ var _ = ginkgo.Describe("Plugin test", func() {
WithTimeout(pollingDurationLong).
Should(gomega.BeTrue())
})

ginkgo.It("check the interceptor", func() {
// wait for secret to become synced
vPod := &corev1.Pod{}
err := f.VclusterCRClient.Get(f.Context, types.NamespacedName{Name: "stuff", Namespace: "test"}, vPod)
framework.ExpectNoError(err)

// check if secret is synced correctly
framework.ExpectEqual(vPod.Name, "definitelynotstuff")
})
})
3 changes: 3 additions & 0 deletions e2e/test_plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func main() {
plugin.MustRegister(syncers.NewMyDeploymentSyncer(ctx))
plugin.MustRegister(syncers.NewCarSyncer(ctx))
plugin.MustRegister(syncers.NewImportSecrets(ctx))
plugin.MustRegister(syncers.DummyInterceptor{})

klog.Info("finished registering the plugins")
plugin.MustStart()
}

Expand Down
56 changes: 56 additions & 0 deletions e2e/test_plugin/syncers/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package syncers

import (
"net/http"

"github.com/loft-sh/vcluster-sdk/plugin"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

v2 "github.com/loft-sh/vcluster/pkg/plugin/v2"
corev1 "k8s.io/api/core/v1"
)

var _ plugin.Interceptor = DummyInterceptor{}

type DummyInterceptor struct {
}

func (d DummyInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
scheme := runtime.NewScheme()
clientgoscheme.AddToScheme(scheme)

s := serializer.NewCodecFactory(scheme)
responsewriters.WriteObjectNegotiated(
s,
negotiation.DefaultEndpointRestrictions,
schema.GroupVersion{
Group: "",
Version: "v1"},
w,
r,
200,
&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "definitelynotstuff"}},
false)
}

func (d DummyInterceptor) Name() string {
return "testinterceptor"
}

func (d DummyInterceptor) InterceptedRequests() []v2.Interceptor {
return []v2.Interceptor{
{
HandlerName: "testhandler",
APIGroups: []string{"*"},
Resources: []string{"pods"},
ResourceNames: []string{"*"},
Verbs: []string{"get"},
},
}
}
62 changes: 59 additions & 3 deletions plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"sync"

"github.com/ghodss/yaml"
Expand All @@ -22,6 +24,7 @@ import (
"github.com/loft-sh/vcluster/pkg/util/clienthelper"
contextutil "github.com/loft-sh/vcluster/pkg/util/context"
"github.com/pkg/errors"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
Expand All @@ -30,7 +33,9 @@ import (
)

func newManager() Manager {
return &manager{}
return &manager{
interceptorsHandlers: make(map[string]http.Handler),
}
}

type manager struct {
Expand All @@ -47,6 +52,10 @@ type manager struct {

syncers []syncertypes.Base

interceptorsHandlers map[string]http.Handler
interceptors []Interceptor
interceptorsPort int

proConfig v2.InitConfigPro

options Options
Expand Down Expand Up @@ -106,6 +115,7 @@ func (m *manager) InitWithOptions(options Options) (*synccontext.RegisterContext
if err != nil {
return nil, fmt.Errorf("error decoding init config %s: %w", initRequest.Config, err)
}
m.interceptorsPort = initConfig.Port

// try to change working dir
if initConfig.WorkingDir != "" {
Expand All @@ -129,6 +139,16 @@ func (m *manager) InitWithOptions(options Options) (*synccontext.RegisterContext
ctrl.SetLogger(logger)
ctx := klog.NewContext(context.Background(), logger)

go func() {
// we need to start them regardless of being the leader, since the traffic is
// directed to all replicas
err := m.startInterceptors()
if err != nil {
logger.Error(err, "error while running the http interceptors:")
os.Exit(1)
}
}()

// now create register context
virtualClusterConfig := &config.VirtualClusterConfig{}
err = json.Unmarshal(initConfig.Config, virtualClusterConfig)
Expand Down Expand Up @@ -177,7 +197,17 @@ func (m *manager) Register(syncer syncertypes.Base) error {
m.m.Lock()
defer m.m.Unlock()

m.syncers = append(m.syncers, syncer)
if int, ok := syncer.(Interceptor); ok {
for _, rule := range int.InterceptedRequests() {
if _, ok := m.interceptorsHandlers[rule.HandlerName]; ok {
return fmt.Errorf("could not add the interceptor %s because the handler name %s is already in use", int.Name(), rule.HandlerName)
}
m.interceptorsHandlers[rule.HandlerName] = int
m.interceptors = append(m.interceptors, int)
}
} else {
m.syncers = append(m.syncers, syncer)
}
return nil
}

Expand All @@ -191,6 +221,24 @@ func (m *manager) Start() error {
return nil
}

func (m *manager) startInterceptors() error {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handlerName := r.Header.Get("VCluster-Plugin-Handler-Name")
if handlerName == "" {
responsewriters.InternalError(w, r, errors.New("header VCluster-Plugin-Handler-Name wasn't set"))
return
}
interceptorHandler, ok := m.interceptorsHandlers[handlerName]
if !ok {
responsewriters.InternalError(w, r, errors.New("header VCluster-Plugin-Handler-Name had no match"))
return
}
interceptorHandler.ServeHTTP(w, r)
})

return http.ListenAndServe("127.0.0.1:"+strconv.Itoa(m.interceptorsPort), handler)
}

func (m *manager) start() error {
m.m.Lock()
defer m.m.Unlock()
Expand All @@ -206,8 +254,11 @@ func (m *manager) start() error {
return fmt.Errorf("find all hooks: %w", err)
}

// find the interceptors
interceptors := m.findAllInterceptors()

// signal we are ready
m.pluginServer.SetReady(hooks)
m.pluginServer.SetReady(hooks, interceptors, m.interceptorsPort)

// wait until we are leader to continue
<-m.pluginServer.IsLeader()
Expand Down Expand Up @@ -305,6 +356,11 @@ func (m *manager) start() error {
return nil
}

func (m *manager) findAllInterceptors() []Interceptor {
klog.Info("len of m.interceptor is : ", len(m.interceptors))
return m.interceptors
}

func (m *manager) findAllHooks() (map[types.VersionKindType][]ClientHook, error) {
// gather all hooks
hooks := map[types.VersionKindType][]ClientHook{}
Expand Down
26 changes: 22 additions & 4 deletions plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type server interface {
Serve()

// SetReady signals the plugin server the plugin is ready to start
SetReady(hooks map[types.VersionKindType][]ClientHook)
SetReady(hooks map[types.VersionKindType][]ClientHook, interceptors []Interceptor, port int)

// Initialized retrieves the initialize request
Initialized() <-chan *pluginv2.Initialize_Request
Expand All @@ -43,7 +43,9 @@ func newPluginServer() (server, error) {
type pluginServer struct {
pluginv2.UnimplementedPluginServer

hooks map[types.VersionKindType][]ClientHook
hooks map[types.VersionKindType][]ClientHook
interceptors []Interceptor
interceptorsPort int

initialized chan *pluginv2.Initialize_Request
isReady chan struct{}
Expand Down Expand Up @@ -88,8 +90,10 @@ func (p *pluginServer) IsLeader() <-chan struct{} {
return p.isLeader
}

func (p *pluginServer) SetReady(hooks map[types.VersionKindType][]ClientHook) {
func (p *pluginServer) SetReady(hooks map[types.VersionKindType][]ClientHook, interceptors []Interceptor, port int) {
p.hooks = hooks
p.interceptors = interceptors
p.interceptorsPort = port
close(p.isReady)
}

Expand Down Expand Up @@ -216,9 +220,11 @@ func (p *pluginServer) GetPluginConfig(context.Context, *pluginv2.GetPluginConfi
return nil, err
}

interceptorConfig := p.getInterceptorConfig()
// build plugin config
pluginConfig := &v2.PluginConfig{
ClientHooks: clientHooks,
ClientHooks: clientHooks,
Interceptors: interceptorConfig,
}

// marshal plugin config
Expand Down Expand Up @@ -265,6 +271,18 @@ func (p *pluginServer) getClientHooks() ([]*v2.ClientHook, error) {
return registeredHooks, nil
}

func (p *pluginServer) getInterceptorConfig() *v2.InterceptorConfig {
interceptorConfig := &v2.InterceptorConfig{
Interceptors: make([]v2.Interceptor, 0),
Port: p.interceptorsPort,
}
for _, interceptor := range p.interceptors {
interceptorConfig.Interceptors = append(interceptorConfig.Interceptors, interceptor.InterceptedRequests()...)
}

return interceptorConfig
}

var _ plugin.Plugin = &pluginServer{}

// Server always returns an error; we're only implementing the GRPCPlugin
Expand Down
11 changes: 11 additions & 0 deletions plugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"context"
"net/http"

synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context"
v2 "github.com/loft-sh/vcluster/pkg/plugin/v2"
Expand Down Expand Up @@ -58,6 +59,16 @@ type ClientHook interface {
Resource() client.Object
}

type Interceptor interface {
syncertypes.Base

// Handler is the handler that will handle the requests delegated by the syncer
http.Handler

// InterceptedRequests returns an rbac style struct which defines what to intercept
InterceptedRequests() []v2.Interceptor
}

type MutateCreateVirtual interface {
MutateCreateVirtual(ctx context.Context, obj client.Object) (client.Object, error)
}
Expand Down

0 comments on commit b2df0db

Please sign in to comment.