diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index acbd89a4..d0fac1d6 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -16,7 +16,7 @@ concurrency: cancel-in-progress: true env: - VCLUSTER_VERSION: v0.20.0-alpha.2 + VCLUSTER_VERSION: v0.20.0-alpha.3 VCLUSTER_SUFFIX: vcluster VCLUSTER_NAME: vcluster VCLUSTER_NAMESPACE: vcluster diff --git a/e2e/plugin/plugin.go b/e2e/plugin/plugin.go index 6159814f..a03db5ae 100644 --- a/e2e/plugin/plugin.go +++ b/e2e/plugin/plugin.go @@ -192,4 +192,14 @@ var _ = ginkgo.Describe("Plugin test", func() { WithTimeout(pollingDurationLong). Should(gomega.BeTrue()) }) + + ginkgo.It("check the interceptor", func() { + // wait for secret to become synced + vPod := &corev1.Pod{} + err := f.VclusterCRClient.Get(f.Context, types.NamespacedName{Name: "stuff", Namespace: "test"}, vPod) + framework.ExpectNoError(err) + + // check if secret is synced correctly + framework.ExpectEqual(vPod.Name, "definitelynotstuff") + }) }) diff --git a/e2e/test_plugin/main.go b/e2e/test_plugin/main.go index 3d2bb504..10c9e0f2 100644 --- a/e2e/test_plugin/main.go +++ b/e2e/test_plugin/main.go @@ -31,6 +31,9 @@ func main() { plugin.MustRegister(syncers.NewMyDeploymentSyncer(ctx)) plugin.MustRegister(syncers.NewCarSyncer(ctx)) plugin.MustRegister(syncers.NewImportSecrets(ctx)) + plugin.MustRegister(syncers.DummyInterceptor{}) + + klog.Info("finished registering the plugins") plugin.MustStart() } diff --git a/e2e/test_plugin/syncers/interceptor.go b/e2e/test_plugin/syncers/interceptor.go new file mode 100644 index 00000000..02e53bf3 --- /dev/null +++ b/e2e/test_plugin/syncers/interceptor.go @@ -0,0 +1,56 @@ +package syncers + +import ( + "net/http" + + "github.com/loft-sh/vcluster-sdk/plugin" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + + v2 "github.com/loft-sh/vcluster/pkg/plugin/v2" + corev1 "k8s.io/api/core/v1" +) + +var _ plugin.Interceptor = DummyInterceptor{} + +type DummyInterceptor struct { +} + +func (d DummyInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + scheme := runtime.NewScheme() + clientgoscheme.AddToScheme(scheme) + + s := serializer.NewCodecFactory(scheme) + responsewriters.WriteObjectNegotiated( + s, + negotiation.DefaultEndpointRestrictions, + schema.GroupVersion{ + Group: "", + Version: "v1"}, + w, + r, + 200, + &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "definitelynotstuff"}}, + false) +} + +func (d DummyInterceptor) Name() string { + return "testinterceptor" +} + +func (d DummyInterceptor) InterceptedRequests() []v2.Interceptor { + return []v2.Interceptor{ + { + HandlerName: "testhandler", + APIGroups: []string{"*"}, + Resources: []string{"pods"}, + ResourceNames: []string{"*"}, + Verbs: []string{"get"}, + }, + } +} diff --git a/plugin/manager.go b/plugin/manager.go index c2fc266b..802305f2 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "fmt" + "net/http" "os" + "strconv" "sync" "github.com/ghodss/yaml" @@ -22,6 +24,7 @@ import ( "github.com/loft-sh/vcluster/pkg/util/clienthelper" contextutil "github.com/loft-sh/vcluster/pkg/util/context" "github.com/pkg/errors" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" @@ -30,7 +33,9 @@ import ( ) func newManager() Manager { - return &manager{} + return &manager{ + interceptorsHandlers: make(map[string]http.Handler), + } } type manager struct { @@ -47,6 +52,10 @@ type manager struct { syncers []syncertypes.Base + interceptorsHandlers map[string]http.Handler + interceptors []Interceptor + interceptorsPort int + proConfig v2.InitConfigPro options Options @@ -106,6 +115,7 @@ func (m *manager) InitWithOptions(options Options) (*synccontext.RegisterContext if err != nil { return nil, fmt.Errorf("error decoding init config %s: %w", initRequest.Config, err) } + m.interceptorsPort = initConfig.Port // try to change working dir if initConfig.WorkingDir != "" { @@ -129,6 +139,16 @@ func (m *manager) InitWithOptions(options Options) (*synccontext.RegisterContext ctrl.SetLogger(logger) ctx := klog.NewContext(context.Background(), logger) + go func() { + // we need to start them regardless of being the leader, since the traffic is + // directed to all replicas + err := m.startInterceptors() + if err != nil { + logger.Error(err, "error while running the http interceptors:") + os.Exit(1) + } + }() + // now create register context virtualClusterConfig := &config.VirtualClusterConfig{} err = json.Unmarshal(initConfig.Config, virtualClusterConfig) @@ -177,7 +197,17 @@ func (m *manager) Register(syncer syncertypes.Base) error { m.m.Lock() defer m.m.Unlock() - m.syncers = append(m.syncers, syncer) + if int, ok := syncer.(Interceptor); ok { + for _, rule := range int.InterceptedRequests() { + if _, ok := m.interceptorsHandlers[rule.HandlerName]; ok { + return fmt.Errorf("could not add the interceptor %s because the handler name %s is already in use", int.Name(), rule.HandlerName) + } + m.interceptorsHandlers[rule.HandlerName] = int + m.interceptors = append(m.interceptors, int) + } + } else { + m.syncers = append(m.syncers, syncer) + } return nil } @@ -191,6 +221,24 @@ func (m *manager) Start() error { return nil } +func (m *manager) startInterceptors() error { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handlerName := r.Header.Get("VCluster-Plugin-Handler-Name") + if handlerName == "" { + responsewriters.InternalError(w, r, errors.New("header VCluster-Plugin-Handler-Name wasn't set")) + return + } + interceptorHandler, ok := m.interceptorsHandlers[handlerName] + if !ok { + responsewriters.InternalError(w, r, errors.New("header VCluster-Plugin-Handler-Name had no match")) + return + } + interceptorHandler.ServeHTTP(w, r) + }) + + return http.ListenAndServe("127.0.0.1:"+strconv.Itoa(m.interceptorsPort), handler) +} + func (m *manager) start() error { m.m.Lock() defer m.m.Unlock() @@ -206,8 +254,11 @@ func (m *manager) start() error { return fmt.Errorf("find all hooks: %w", err) } + // find the interceptors + interceptors := m.findAllInterceptors() + // signal we are ready - m.pluginServer.SetReady(hooks) + m.pluginServer.SetReady(hooks, interceptors, m.interceptorsPort) // wait until we are leader to continue <-m.pluginServer.IsLeader() @@ -305,6 +356,11 @@ func (m *manager) start() error { return nil } +func (m *manager) findAllInterceptors() []Interceptor { + klog.Info("len of m.interceptor is : ", len(m.interceptors)) + return m.interceptors +} + func (m *manager) findAllHooks() (map[types.VersionKindType][]ClientHook, error) { // gather all hooks hooks := map[types.VersionKindType][]ClientHook{} diff --git a/plugin/server.go b/plugin/server.go index d5bc5329..5a35c63f 100644 --- a/plugin/server.go +++ b/plugin/server.go @@ -21,7 +21,7 @@ type server interface { Serve() // SetReady signals the plugin server the plugin is ready to start - SetReady(hooks map[types.VersionKindType][]ClientHook) + SetReady(hooks map[types.VersionKindType][]ClientHook, interceptors []Interceptor, port int) // Initialized retrieves the initialize request Initialized() <-chan *pluginv2.Initialize_Request @@ -43,7 +43,9 @@ func newPluginServer() (server, error) { type pluginServer struct { pluginv2.UnimplementedPluginServer - hooks map[types.VersionKindType][]ClientHook + hooks map[types.VersionKindType][]ClientHook + interceptors []Interceptor + interceptorsPort int initialized chan *pluginv2.Initialize_Request isReady chan struct{} @@ -88,8 +90,10 @@ func (p *pluginServer) IsLeader() <-chan struct{} { return p.isLeader } -func (p *pluginServer) SetReady(hooks map[types.VersionKindType][]ClientHook) { +func (p *pluginServer) SetReady(hooks map[types.VersionKindType][]ClientHook, interceptors []Interceptor, port int) { p.hooks = hooks + p.interceptors = interceptors + p.interceptorsPort = port close(p.isReady) } @@ -216,9 +220,11 @@ func (p *pluginServer) GetPluginConfig(context.Context, *pluginv2.GetPluginConfi return nil, err } + interceptorConfig := p.getInterceptorConfig() // build plugin config pluginConfig := &v2.PluginConfig{ - ClientHooks: clientHooks, + ClientHooks: clientHooks, + Interceptors: interceptorConfig, } // marshal plugin config @@ -265,6 +271,18 @@ func (p *pluginServer) getClientHooks() ([]*v2.ClientHook, error) { return registeredHooks, nil } +func (p *pluginServer) getInterceptorConfig() *v2.InterceptorConfig { + interceptorConfig := &v2.InterceptorConfig{ + Interceptors: make([]v2.Interceptor, 0), + Port: p.interceptorsPort, + } + for _, interceptor := range p.interceptors { + interceptorConfig.Interceptors = append(interceptorConfig.Interceptors, interceptor.InterceptedRequests()...) + } + + return interceptorConfig +} + var _ plugin.Plugin = &pluginServer{} // Server always returns an error; we're only implementing the GRPCPlugin diff --git a/plugin/types.go b/plugin/types.go index e169e96e..f0d55fff 100644 --- a/plugin/types.go +++ b/plugin/types.go @@ -2,6 +2,7 @@ package plugin import ( "context" + "net/http" synccontext "github.com/loft-sh/vcluster/pkg/controllers/syncer/context" v2 "github.com/loft-sh/vcluster/pkg/plugin/v2" @@ -58,6 +59,16 @@ type ClientHook interface { Resource() client.Object } +type Interceptor interface { + syncertypes.Base + + // Handler is the handler that will handle the requests delegated by the syncer + http.Handler + + // InterceptedRequests returns an rbac style struct which defines what to intercept + InterceptedRequests() []v2.Interceptor +} + type MutateCreateVirtual interface { MutateCreateVirtual(ctx context.Context, obj client.Object) (client.Object, error) }