forked from dropbox/llama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.go
106 lines (94 loc) · 2.93 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package llama
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
)
// API represnts the HTTP server answering queries for collected data.
type API struct {
summarizer *Summarizer
server *http.Server
ts TagSet
handler *http.ServeMux
mutex sync.RWMutex
}
// InfluxHandler handles requests for InfluxDB formatted summaries.
func (api *API) InfluxHandler(rw http.ResponseWriter, request *http.Request) {
// Lock the existing summaries cache
api.summarizer.CMutex.RLock()
summaries := api.summarizer.Cache
log.Println("Found", len(summaries), "data points")
// Convert the summaries to influx datapoints
api.mutex.RLock()
ifdp := NewDataPointsFromSummaries(summaries, api.ts)
api.mutex.RUnlock()
// And unlock the cache
api.summarizer.CMutex.RUnlock()
// Convert to JSON
asJson, err := json.Marshal(ifdp)
if err != nil {
log.Println(err)
rw.WriteHeader(500)
return
}
// Send back the response
_, err = rw.Write(asJson)
HandleMinorError(err)
}
// StatusHandler acts as a back healthcheck and simply returns 200 OK.
func (api *API) StatusHandler(rw http.ResponseWriter, request *http.Request) {
fmt.Fprintf(rw, "ok")
}
// Stop will close down the server and cause Run to exit.
func (api *API) Stop() {
err := api.server.Close()
if err != nil {
log.Println("Error stopping API:", err)
}
log.Println("API Stopped")
}
// Run calls RunForever in a separate goroutine for non-blocking behavior.
func (api *API) Run() {
// This basically just exists to be consistent with the existing pattern
// while also allowing it to be run blocking if desired.
go api.RunForever()
}
// MergeUpdateTagSet combines a provided TagSet with the existing one
func (api *API) MergeUpdateTagSet(t TagSet) {
api.mutex.Lock()
// Copy new entries into the existing TagSet
// Allowing retention of existing entries, updating where needed, and adding new
for k, v := range t {
api.ts[k] = v
}
api.mutex.Unlock()
}
// RunForever sets up the handlers above and then listens for requests until
// stopped or a fatal error occurs.
//
// Calling this will block until stopped/crashed.
func (api *API) RunForever() {
// Setup the handlers
// TODO(dmar): It might be better to move this elsewhere?
api.setupHandlers()
// TODO(dmar): Better handling around if this dies or gets shutdown. Though
// if it dies, the collector is kinda useless anyways.
log.Fatal(api.server.ListenAndServe())
}
// SetupHandlers attaches the handlers above to the http server mux.
func (api *API) setupHandlers() {
api.handler.HandleFunc("/status", api.StatusHandler)
api.handler.HandleFunc("/influxdata", api.InfluxHandler)
}
// New returns an initialized API struct.
func NewAPI(s *Summarizer, t TagSet, addr string) *API {
// TODO(dmar): In the future, make these options that can be provided.
handler := http.NewServeMux()
server := &http.Server{
Addr: addr,
Handler: handler,
}
return &API{summarizer: s, ts: t, handler: handler, server: server}
}