Skip to content

Commit

Permalink
Merge pull request #16 from FabianKramm/hooks
Browse files Browse the repository at this point in the history
refactor: improve hook & plugin registration
  • Loading branch information
FabianKramm authored May 27, 2022
2 parents 111f13c + 3c49a4d commit 2490d11
Show file tree
Hide file tree
Showing 11 changed files with 410 additions and 287 deletions.
7 changes: 7 additions & 0 deletions hook/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// ClientHook tells the sdk that this action watches on certain vcluster requests and wants
// to mutate these. The objects this action wants to watch can be defined through the
// Resource() function that returns a new object of the type to watch. By implementing
// the defined interfaces below it is possible to watch on:
// Create, Update (includes patch requests), Delete and Get requests.
// This makes it possible to change incoming or outgoing objects on the fly, without the
// need to completely replace a vanilla vcluster syncer.
type ClientHook interface {
syncer.Base

Expand Down
66 changes: 45 additions & 21 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Options struct {
ListenAddress string
}

type LeaderElectionHook func(ctx context.Context) error

type Manager interface {
// Init creates a new plugin context and will block until the
// vcluster container instance could be contacted.
Expand Down Expand Up @@ -101,6 +103,7 @@ type manager struct {
started bool
syncers []syncer.Base

name string
address string
context *synccontext.RegisterContext

Expand All @@ -125,6 +128,7 @@ func (m *manager) InitWithOptions(name string, opts Options) (*synccontext.Regis
m.initialized = true

log := log.New("plugin")
m.name = name
m.address = "localhost:10099"
if opts.ListenAddress != "" {
m.address = opts.ListenAddress
Expand All @@ -142,10 +146,7 @@ func (m *manager) InitWithOptions(name string, opts Options) (*synccontext.Regis
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

pluginContext, err = remote.NewPluginInitializerClient(conn).Register(ctx, &remote.PluginInfo{
Name: name,
Version: "v2",
})
pluginContext, err = remote.NewVClusterClient(conn).GetContext(ctx, &remote.Empty{})
if err != nil {
return false, nil
}
Expand Down Expand Up @@ -258,7 +259,7 @@ func (m *manager) Start() error {
return nil
}

func (m *manager) startHookServer(log log.Logger) error {
func (m *manager) registerPlugin(log log.Logger) error {
serverAddress := os.Getenv(PLUGIN_SERVER_ADDRESS)
if serverAddress == "" {
log.Errorf("Environment variable %s not defined, are you using an old vcluster version?", PLUGIN_SERVER_ADDRESS)
Expand Down Expand Up @@ -386,24 +387,47 @@ func (m *manager) startHookServer(log log.Logger) error {
}

// start the grpc server
log.Infof("Plugin server listening on %s", serverAddress)
lis, err := net.Listen("tcp", serverAddress)
if len(registeredHooks) > 0 {
log.Infof("Plugin server listening on %s", serverAddress)
lis, err := net.Listen("tcp", serverAddress)
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}

var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
remote.RegisterPluginServer(grpcServer, &pluginServer{
hooks: hooks,
registeredHooks: registeredHooks,
})
go func() {
err := grpcServer.Serve(lis)
if err != nil {
panic(err)
}
}()
}

// register the plugin
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

conn, err := grpc.Dial(m.address, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
return fmt.Errorf("error dialing vcluster: %v", err)
}

var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
remote.RegisterPluginServer(grpcServer, &pluginServer{
hooks: hooks,
registeredHooks: registeredHooks,
defer conn.Close()
_, err = remote.NewVClusterClient(conn).RegisterPlugin(ctx, &remote.RegisterPluginRequest{
Version: "v1",
Name: m.name,
Address: serverAddress,
ClientHooks: registeredHooks,
})
go func() {
err := grpcServer.Serve(lis)
if err != nil {
panic(err)
}
}()
if err != nil {
log.Errorf("error trying to connect to vcluster: %v", err)
return err
}

return nil
}
Expand All @@ -414,7 +438,7 @@ func (m *manager) start() error {
return fmt.Errorf("manager was already started")
}

err := m.startHookServer(log)
err := m.registerPlugin(log)
if err != nil {
return errors.Wrap(err, "start hook server")
}
Expand All @@ -429,7 +453,7 @@ func (m *manager) start() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

isLeader, err := remote.NewPluginInitializerClient(conn).IsLeader(ctx, &remote.Empty{})
isLeader, err := remote.NewVClusterClient(conn).IsLeader(ctx, &remote.Empty{})
if err != nil {
log.Errorf("error trying to connect to vcluster: %v", err)
conn.Close()
Expand Down
4 changes: 0 additions & 4 deletions plugin/plugin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ type ApiVersionKindType struct {

var _ remote.PluginServer = &pluginServer{}

func (p *pluginServer) Register(ctx context.Context, req *remote.RegisterPluginRequest) (*remote.RegisterPluginResult, error) {
return &remote.RegisterPluginResult{ClientHooks: p.registeredHooks}, nil
}

func (p *pluginServer) Mutate(ctx context.Context, req *remote.MutateRequest) (*remote.MutateResult, error) {
hooks, ok := p.hooks[ApiVersionKindType{
ApiVersion: req.ApiVersion,
Expand Down
Loading

0 comments on commit 2490d11

Please sign in to comment.