forked from Safecast/safestream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
http-stream.go
149 lines (125 loc) · 3.51 KB
/
http-stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2021 Safecast. All rights reserved.
// Use of this source code is governed by licenses granted by the
// copyright holder including that found in the LICENSE file.
package main
import (
"fmt"
"net/http"
"strings"
"text/template"
"time"
"github.com/gorilla/websocket"
)
// WS support
var upgrader = websocket.Upgrader{} // use default options
// The main stream template
var streamTemplate *template.Template
// Initialize for stream processing
func streamInit() (err error) {
var contents []byte
contents, err = resourceRead("main.html")
if err != nil {
return
}
streamTemplate = template.Must(template.New("").Parse(string(contents)))
return
}
// Handle http requests to the root url
func streamLaunch(rsp http.ResponseWriter, req *http.Request) {
isTLS := req.Header.Get("X-Forwarded-Proto") == "https"
scheme := "ws:"
if isTLS {
scheme = "wss:"
}
url := scheme + "//" + req.Host + httpTopicStream1 + req.URL.String()
streamTemplate.Execute(rsp, url)
}
// Handle ws template
func httpStreamHandler(rsp http.ResponseWriter, req *http.Request) {
// Get the args
target, args, err := httpArgs(req, "")
if err != nil {
return
}
if strings.HasSuffix(target, "/") {
target = strings.TrimSuffix(target, "/")
}
// Remove stream prefix from target, in all variations
target = "/" + target
target = strings.TrimPrefix(target, httpTopicStream2)
target = strings.TrimPrefix(target, httpTopicStream1)
target = strings.TrimPrefix(target, "/")
// Upgrade the endpoint to use websockets
c, err := upgrader.Upgrade(rsp, req, nil)
if err != nil {
fmt.Printf("upgrade: %s\n", err)
return
}
defer c.Close()
// Launch the reader
inboundExited := false
go processInbound(c, rsp, req, &inboundExited)
// Generate a unique watcher ID
requestorIPV4, _ := getRequestorIPv4(req)
watcherID := watcherCreate(requestorIPV4, target, args)
// Data watching loop
for !inboundExited {
// Get more data from the watcher
data, ipinfo, err := watcherGet(watcherID, 3*time.Second)
if err != nil {
break
}
// Write either the accumulated notification text, or the idle message,
// counting on the fact that one or the other will eventually fail when
// the HTTP client goes away
if len(data) == 0 {
s := "waiting for events\n"
err = c.WriteMessage(websocket.TextMessage, []byte(s))
if err != nil {
break
}
} else {
err = nil
for _, sd := range data {
events := filterClassify(sd, ipinfo)
for _, e := range events {
maplink := fmt.Sprintf("https://www.google.com/maps/search/?api=1&query=%f,%f", e.lat, e.lon)
inflink := fmt.Sprintf("https://tt.safecast.org/id/%s", e.device)
s := fmt.Sprintf("%s%02d %0.02f%% %s%s%s %s %s%.0f%s\n",
e.class, int(e.percent*10),
e.percent*100,
"<a href=\""+inflink+"\" target=\"_blank\">", e.name, "</a>",
e.summary,
"<a href=\""+maplink+"\" target=\"_blank\">", e.distance/1000, "km</a>")
err = c.WriteMessage(websocket.TextMessage, []byte(s))
if err != nil {
break
}
time.Sleep(250 * time.Millisecond)
}
}
if err != nil {
break
}
}
}
// Done
watcherDelete(watcherID)
return
}
// Function that proceses messages coming in from the JS client
func processInbound(c *websocket.Conn, w http.ResponseWriter, r *http.Request, exited *bool) {
fmt.Printf("inbound: enter\n")
for {
mt, message, err := c.ReadMessage()
if err != nil {
break
}
err = c.WriteMessage(mt, message)
if err != nil {
break
}
}
fmt.Printf("inbound: exit\n")
*exited = true
}