Skip to content
This repository has been archived by the owner on Aug 3, 2020. It is now read-only.

Commit

Permalink
Merge pull request #9 from ibuildthecloud/support-1.2
Browse files Browse the repository at this point in the history
Support 1.2
  • Loading branch information
ibuildthecloud committed Feb 26, 2016
2 parents ac4bfa4 + 0404f3d commit 7d5e19b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 31 deletions.
37 changes: 37 additions & 0 deletions kubernetesevents/change_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kubernetesevents

import (
"github.com/rancher/go-rancher/client"
"github.com/rancher/kubernetes-agent/kubernetesclient"
"github.com/rancher/kubernetes-model/model"
)

func NewChangeHandler(rancherClient *client.RancherClient, kubernetesClient *kubernetesclient.Client, kindHandled string) *ChangeHandler {
return &ChangeHandler{
rancherClient: rancherClient,
kClient: kubernetesClient,
kindHandled: kindHandled,
}
}

type ChangeHandler struct {
rancherClient *client.RancherClient
kClient *kubernetesclient.Client
kindHandled string
}

func (h *ChangeHandler) GetKindHandled() string {
return h.kindHandled
}

func (h *ChangeHandler) Handle(event model.WatchEvent) error {
_, err := h.rancherClient.Publish.Create(&client.Publish{
Name: "service.kubernetes.change",
Data: map[string]interface{}{
"type": event.Type,
"object": event.Object,
},
})

return err
}
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func main() {
Usage: "Port to configure an HTTP health check listener on",
EnvVar: "HEALTH_CHECK_PORT",
},
cli.StringSliceFlag{
Name: "watch-kind",
Value: &cli.StringSlice{"namespaces", "services", "replicationcontrollers", "pods"},
Usage: "Which k8s kinds to watch and report changes to Rancher",
},
}

app.Run(os.Args)
Expand All @@ -73,6 +78,11 @@ func launch(c *cli.Context) {
svcHandler := kubernetesevents.NewHandler(rClient, kClient, kubernetesevents.ServiceKind)
handlers := []kubernetesevents.Handler{svcHandler}

log.Info("Watching changes for kinds: ", c.StringSlice("watch-kind"))
for _, kind := range c.StringSlice("watch-kind") {
handlers = append(handlers, kubernetesevents.NewChangeHandler(rClient, kClient, kind))
}

go func(rc chan error) {
err := kubernetesevents.ConnectToEventStream(handlers, conf)
log.Errorf("Kubernetes stream listener exited with error: %s", err)
Expand Down
67 changes: 36 additions & 31 deletions rancherevents/eventhandlers/provide_lables_handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package eventhandlers

import (
log "github.com/Sirupsen/logrus"
"strings"

log "github.com/Sirupsen/logrus"
"github.com/mitchellh/mapstructure"
revents "github.com/rancher/go-machine-service/events"
"github.com/rancher/go-rancher/client"
"github.com/rancher/kubernetes-agent/kubernetesclient"
Expand Down Expand Up @@ -85,37 +86,41 @@ func (h *syncHandler) Handler(event *revents.Event, cli *client.RancherClient) e
return nil
}

func (h *syncHandler) getPod(event *revents.Event) (string, string) {
// TODO Rewrite this horror
data := event.Data
if ihm, ok := data["instanceHostMap"]; ok {
if ihmMap, ok := ihm.(map[string]interface{}); ok {
if i, ok := ihmMap["instance"]; ok {
if iMap, ok := i.(map[string]interface{}); ok {
if d, ok := iMap["data"]; ok {
if dMap, ok := d.(map[string]interface{}); ok {
if f, ok := dMap["fields"]; ok {
if fMap, ok := f.(map[string]interface{}); ok {
if labels, ok := fMap["labels"]; ok {
if lMap, ok := labels.(map[string]interface{}); ok {
if l, ok := lMap["io.kubernetes.pod.name"]; ok {
if label, ok := l.(string); ok {
parts := strings.SplitN(label, "/", 2)
if len(parts) == 2 {
return parts[0], parts[1]
}
}
}
}
}
}
}
}
}
}
}
func (h *syncHandler) getPod(event *revents.Event) (ns, name string) {
ihm := &struct {
IHM struct {
I struct {
D struct {
F struct {
Labels map[string]string `mapstructure:"labels"`
} `mapstructure:"fields"`
} `mapstructure:"data"`
} `mapstructure:"instance"`
} `mapstructure:"instanceHostMap"`
}{}

err := mapstructure.Decode(event.Data, &ihm)
if err != nil {
log.Error("Cannot parse event")
return
}

labels := ihm.IHM.I.D.F.Labels
if len(labels) == 0 {
return
}

var ok bool
if ns, ok = labels["io.kubernetes.pod.namespace"]; ok {
// version >= 1.2
name = labels["io.kubernetes.pod.name"]
} else if name, ok = labels["io.kubernetes.pod.name"]; ok {
// try to parse
parts := strings.SplitN(name, "/", 2)
if len(parts) == 2 {
ns, name = parts[0], parts[1]
}
}

return "", ""
return
}

0 comments on commit 7d5e19b

Please sign in to comment.