Skip to content

Commit

Permalink
Rabbitmq plugin: connection-related metrics. (influxdata#1908)
Browse files Browse the repository at this point in the history
* Rabbitmq plugin: connection-related metrics.

* Run go fmt.
  • Loading branch information
kishorenc authored and sparrc committed Dec 13, 2016
1 parent 7558081 commit dede3e7
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 3 deletions.
69 changes: 66 additions & 3 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type RabbitMQ struct {
ClientTimeout internal.Duration `toml:"client_timeout"`

// InsecureSkipVerify bool
Nodes []string
Queues []string
Nodes []string
Queues []string
Connections []string

Client *http.Client
}
Expand Down Expand Up @@ -135,10 +136,22 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"`
}

// Connection ...
type Connection struct {
Name string
State string
Vhost string
Host string
Node string
ReceiveCount int64 `json:"recv_cnt"`
SendCount int64 `json:"send_cnt"`
SendPend int64 `json:"send_pend"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherConnections}

var sampleConfig = `
# url = "http://localhost:15672"
Expand Down Expand Up @@ -380,6 +393,42 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
errChan <- nil
}

func gatherConnections(r *RabbitMQ, acc telegraf.Accumulator, errChan chan error) {
// Gather information about connections
connections := make([]Connection, 0)
err := r.requestJSON("/api/connections", &connections)
if err != nil {
errChan <- err
return
}

for _, connection := range connections {
if !r.shouldGatherConnection(connection) {
continue
}
tags := map[string]string{
"url": r.URL,
"connection": connection.Name,
"vhost": connection.Vhost,
"host": connection.Host,
"node": connection.Node,
}

acc.AddFields(
"rabbitmq_connection",
map[string]interface{}{
"recv_cnt": connection.ReceiveCount,
"send_cnt": connection.SendCount,
"send_pend": connection.SendPend,
"state": connection.State,
},
tags,
)
}

errChan <- nil
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 {
return true
Expand Down Expand Up @@ -408,6 +457,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
return false
}

func (r *RabbitMQ) shouldGatherConnection(connection Connection) bool {
if len(r.Connections) == 0 {
return true
}

for _, name := range r.Connections {
if name == connection.Name {
return true
}
}

return false
}

func init() {
inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{
Expand Down
64 changes: 64 additions & 0 deletions plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,57 @@ const sampleQueuesResponse = `
]
`

const sampleConnectionsResponse = `
[
{
"recv_oct": 166055,
"recv_oct_details": {
"rate": 0
},
"send_oct": 589,
"send_oct_details": {
"rate": 0
},
"recv_cnt": 124,
"send_cnt": 7,
"send_pend": 0,
"state": "running",
"channels": 1,
"type": "network",
"node": "rabbit@ip-10-0-12-133",
"name": "10.0.10.8:32774 -> 10.0.12.131:5672",
"port": 5672,
"peer_port": 32774,
"host": "10.0.12.131",
"peer_host": "10.0.10.8",
"ssl": false,
"peer_cert_subject": null,
"peer_cert_issuer": null,
"peer_cert_validity": null,
"auth_mechanism": "AMQPLAIN",
"ssl_protocol": null,
"ssl_key_exchange": null,
"ssl_cipher": null,
"ssl_hash": null,
"protocol": "AMQP 0-9-1",
"user": "workers",
"vhost": "main",
"timeout": 0,
"frame_max": 131072,
"channel_max": 65535,
"client_properties": {
"product": "py-amqp",
"product_version": "1.4.7",
"capabilities": {
"connection.blocked": true,
"consumer_cancel_notify": true
}
},
"connected_at": 1476647837266
}
]
`

func TestRabbitMQGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
Expand All @@ -385,6 +436,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
rsp = sampleNodesResponse
case "/api/queues":
rsp = sampleQueuesResponse
case "/api/connections":
rsp = sampleConnectionsResponse
default:
panic("Cannot handle request")
}
Expand Down Expand Up @@ -441,4 +494,15 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
}

assert.True(t, acc.HasMeasurement("rabbitmq_queue"))

assert.True(t, acc.HasMeasurement("rabbitmq_connection"))

connection_fields := map[string]interface{}{
"recv_cnt": int64(124),
"send_cnt": int64(7),
"send_pend": int64(0),
"state": "running",
}

acc.AssertContainsFields(t, "rabbitmq_connection", connection_fields)
}

0 comments on commit dede3e7

Please sign in to comment.