diff --git a/kubernetesevents/change_handler.go b/kubernetesevents/change_handler.go new file mode 100644 index 0000000..ad7ebbc --- /dev/null +++ b/kubernetesevents/change_handler.go @@ -0,0 +1,37 @@ +package kubernetesevents + +import ( + "github.com/rancher/go-rancher/client" + "github.com/rancher/kubernetes-agent/kubernetesclient" + "github.com/rancher/kubernetes-model/model" +) + +func NewChangeHandler(rancherClient *client.RancherClient, kubernetesClient *kubernetesclient.Client, kindHandled string) *ChangeHandler { + return &ChangeHandler{ + rancherClient: rancherClient, + kClient: kubernetesClient, + kindHandled: kindHandled, + } +} + +type ChangeHandler struct { + rancherClient *client.RancherClient + kClient *kubernetesclient.Client + kindHandled string +} + +func (h *ChangeHandler) GetKindHandled() string { + return h.kindHandled +} + +func (h *ChangeHandler) Handle(event model.WatchEvent) error { + _, err := h.rancherClient.Publish.Create(&client.Publish{ + Name: "service.kubernetes.change", + Data: map[string]interface{}{ + "type": event.Type, + "object": event.Object, + }, + }) + + return err +} diff --git a/main.go b/main.go index 06d4c98..2ff3c6f 100644 --- a/main.go +++ b/main.go @@ -53,6 +53,11 @@ func main() { Usage: "Port to configure an HTTP health check listener on", EnvVar: "HEALTH_CHECK_PORT", }, + cli.StringSliceFlag{ + Name: "watch-kind", + Value: &cli.StringSlice{"namespaces", "services", "replicationcontrollers", "pods"}, + Usage: "Which k8s kinds to watch and report changes to Rancher", + }, } app.Run(os.Args) @@ -73,6 +78,11 @@ func launch(c *cli.Context) { svcHandler := kubernetesevents.NewHandler(rClient, kClient, kubernetesevents.ServiceKind) handlers := []kubernetesevents.Handler{svcHandler} + log.Info("Watching changes for kinds: ", c.StringSlice("watch-kind")) + for _, kind := range c.StringSlice("watch-kind") { + handlers = append(handlers, kubernetesevents.NewChangeHandler(rClient, kClient, kind)) + } + go func(rc chan error) { err := kubernetesevents.ConnectToEventStream(handlers, conf) log.Errorf("Kubernetes stream listener exited with error: %s", err) diff --git a/rancherevents/eventhandlers/provide_lables_handler.go b/rancherevents/eventhandlers/provide_lables_handler.go index 9babcb5..eba1399 100644 --- a/rancherevents/eventhandlers/provide_lables_handler.go +++ b/rancherevents/eventhandlers/provide_lables_handler.go @@ -1,9 +1,10 @@ package eventhandlers import ( - log "github.com/Sirupsen/logrus" "strings" + log "github.com/Sirupsen/logrus" + "github.com/mitchellh/mapstructure" revents "github.com/rancher/go-machine-service/events" "github.com/rancher/go-rancher/client" "github.com/rancher/kubernetes-agent/kubernetesclient" @@ -85,37 +86,41 @@ func (h *syncHandler) Handler(event *revents.Event, cli *client.RancherClient) e return nil } -func (h *syncHandler) getPod(event *revents.Event) (string, string) { - // TODO Rewrite this horror - data := event.Data - if ihm, ok := data["instanceHostMap"]; ok { - if ihmMap, ok := ihm.(map[string]interface{}); ok { - if i, ok := ihmMap["instance"]; ok { - if iMap, ok := i.(map[string]interface{}); ok { - if d, ok := iMap["data"]; ok { - if dMap, ok := d.(map[string]interface{}); ok { - if f, ok := dMap["fields"]; ok { - if fMap, ok := f.(map[string]interface{}); ok { - if labels, ok := fMap["labels"]; ok { - if lMap, ok := labels.(map[string]interface{}); ok { - if l, ok := lMap["io.kubernetes.pod.name"]; ok { - if label, ok := l.(string); ok { - parts := strings.SplitN(label, "/", 2) - if len(parts) == 2 { - return parts[0], parts[1] - } - } - } - } - } - } - } - } - } - } - } +func (h *syncHandler) getPod(event *revents.Event) (ns, name string) { + ihm := &struct { + IHM struct { + I struct { + D struct { + F struct { + Labels map[string]string `mapstructure:"labels"` + } `mapstructure:"fields"` + } `mapstructure:"data"` + } `mapstructure:"instance"` + } `mapstructure:"instanceHostMap"` + }{} + + err := mapstructure.Decode(event.Data, &ihm) + if err != nil { + log.Error("Cannot parse event") + return + } + + labels := ihm.IHM.I.D.F.Labels + if len(labels) == 0 { + return + } + + var ok bool + if ns, ok = labels["io.kubernetes.pod.namespace"]; ok { + // version >= 1.2 + name = labels["io.kubernetes.pod.name"] + } else if name, ok = labels["io.kubernetes.pod.name"]; ok { + // try to parse + parts := strings.SplitN(name, "/", 2) + if len(parts) == 2 { + ns, name = parts[0], parts[1] } } - return "", "" + return }