Skip to content

Commit

Permalink
Add task collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomás Senart committed Jul 31, 2015
1 parent 9e40376 commit 5f24489
Showing 1 changed file with 99 additions and 33 deletions.
132 changes: 99 additions & 33 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type collector struct {
*http.Client
name string
port int
metrics map[*prometheus.GaugeVec]func(*slave, prometheus.Gauge)
metrics map[prometheus.Collector]func(*slave, prometheus.Collector)
}

func newCollector(name string, port int, timeout time.Duration) *collector {
Expand All @@ -49,79 +49,126 @@ func newCollector(name string, port int, timeout time.Duration) *collector {
Client: &http.Client{Timeout: timeout},
name: name,
port: port,
metrics: map[*prometheus.GaugeVec]func(*slave, prometheus.Gauge){
metrics: map[prometheus.Collector]func(*slave, prometheus.Collector){
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Total slave CPUs (fractional)",
Name: "cpus",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Total.CPUs) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Total.CPUs)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Used slave CPUs (fractional)",
Name: "cpus_used",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Used.CPUs) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Used.CPUs)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Unreserved slave CPUs (fractional)",
Name: "cpus_unreserved",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Unreserved.CPUs) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Unreserved.CPUs)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Total slave memory in MB",
Name: "mem",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Total.Mem) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Total.Mem)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Used slave memory in MB",
Name: "mem_used",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Used.Mem) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Used.Mem)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Unreserved slave memory in MB",
Name: "mem_unreserved",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Unreserved.Mem) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Unreserved.Mem)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Total slave disk in MB",
Name: "disk",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Total.Disk) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Total.Disk)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Used slave disk in MB",
Name: "disk_used",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Used.Disk) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Used.Disk)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Unreserved slave disk in MB",
Name: "disk_unreserved",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(s.Unreserved.Disk) },
}, labels): func(s *slave, c prometheus.Collector) {
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(s.Unreserved.Disk)
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Total slave ports",
Name: "ports",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(float64(s.Total.Ports.size())) },
}, labels): func(s *slave, c prometheus.Collector) {
size := s.Total.Ports.size()
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(float64(size))
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Used slave ports",
Name: "ports_used",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(float64(s.Used.Ports.size())) },
}, labels): func(s *slave, c prometheus.Collector) {
size := s.Used.Ports.size()
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(float64(size))
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Unreserved slave ports",
Name: "ports_unreserved",
Namespace: "mesos",
Subsystem: "slave",
}, labels): func(s *slave, g prometheus.Gauge) { g.Set(float64(s.Unreserved.Ports.size())) },
}, labels): func(s *slave, c prometheus.Collector) {
size := s.Unreserved.Ports.size()
c.(*prometheus.GaugeVec).WithLabelValues(s.PID).Set(float64(size))
},
prometheus.NewGaugeVec(prometheus.GaugeOpts{
Help: "Slave tasks",
Name: "tasks",
Namespace: "mesos",
Subsystem: "slave",
}, nil): func(s *slave, c prometheus.Collector) {
for _, task := range s.Tasks {
labels := prometheus.Labels{
"slave": s.PID,
"executor": task.ExecutorID,
"name": task.Name,
"framework": task.FrameworkID,
}
for _, status := range task.Statuses {
labels["timestamp"] = strconv.FormatFloat(status.Timestamp, 'f', -1, 64)
labels["state"] = status.State
c.(*prometheus.GaugeVec).With(labels).Set(float64(len(s.Tasks)))
}
}
},
},
}
}
Expand Down Expand Up @@ -156,10 +203,9 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}

for _, slave := range s.Slaves {
for metric, set := range c.metrics {
m := metric.WithLabelValues(slave.PID)
set(&slave, m)
ch <- m
for c, set := range c.metrics {
set(&slave, c)
c.Collect(ch)
}
}
}
Expand Down Expand Up @@ -208,20 +254,40 @@ func (rs ranges) size() uint64 {
return sz
}

type resources struct {
CPUs float64 `json:"cpus"`
Disk float64 `json:"disk"`
Mem float64 `json:"mem"`
Ports ranges `json:"ports"`
}
type (
resources struct {
CPUs float64 `json:"cpus"`
Disk float64 `json:"disk"`
Mem float64 `json:"mem"`
Ports ranges `json:"ports"`
}

type slave struct {
PID string `json:"pid"`
Used resources `json:"used_resources"`
Unreserved resources `json:"unreserved_resources"`
Total resources `json:"resources"`
}
task struct {
Name string `json:"name"`
ID string `json:"id"`
ExecutorID string `json:"executor_id"`
FrameworkID string `json:"framework_id"`
SlaveID string `json:"slave_id"`
State string `json:"state"`
Labels []string `json:"labels"`
Resources resources `json:"resources"`
Statuses []status `json:"statuses"`
}

type state struct {
Slaves []slave `json:"slaves"`
}
status struct {
State string `json:"state"`
Timestamp float64 `json:"timestamp"`
}

slave struct {
PID string `json:"pid"`
Used resources `json:"used_resources"`
Unreserved resources `json:"unreserved_resources"`
Total resources `json:"resources"`
Tasks []task `json:"task"`
}

state struct {
Slaves []slave `json:"slaves"`
}
)

0 comments on commit 5f24489

Please sign in to comment.