Skip to content

Commit

Permalink
Add webrtc handling to localserver
Browse files Browse the repository at this point in the history
  • Loading branch information
RebeccaMahany committed Jun 5, 2024
1 parent a90ddf5 commit efd58c5
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 20 deletions.
10 changes: 10 additions & 0 deletions ee/localserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type localServer struct {
limiter *rate.Limiter
tlsCerts []tls.Certificate
querier Querier
webrtcConn *webrtcConnectionHandler
kolideServer string
cancel context.CancelFunc

Expand Down Expand Up @@ -100,6 +101,7 @@ func New(ctx context.Context, k types.Knapsack) (*localServer, error) {
ecAuthedMux.Handle("/query.png", ls.requestQueryHandler())
ecAuthedMux.Handle("/scheduledquery", ls.requestScheduledQueryHandler())
ecAuthedMux.Handle("/scheduledquery.png", ls.requestScheduledQueryHandler())
ecAuthedMux.Handle("/webrtc", ls.webrtcHandler())

mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
Expand Down Expand Up @@ -140,6 +142,10 @@ func (ls *localServer) SetQuerier(querier Querier) {
ls.querier = querier
}

func (ls *localServer) setWebrtcConn(w *webrtcConnectionHandler) {
ls.webrtcConn = w
}

func (ls *localServer) LoadDefaultKeyIfNotSet() error {
if ls.serverKey != nil {
return nil
Expand Down Expand Up @@ -299,6 +305,10 @@ func (ls *localServer) Stop() error {
)
}

if ls.webrtcConn != nil {
ls.webrtcConn.close()
}

// Consider calling srv.Stop as a more forceful shutdown?

return nil
Expand Down
180 changes: 180 additions & 0 deletions ee/localserver/webrtc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package localserver

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"

"github.com/kolide/launcher/pkg/traces"
webrtc "github.com/pion/webrtc/v4"
)

func (ls *localServer) webrtcHandler() http.Handler {
return http.HandlerFunc(ls.webrtcHandlerFunc)
}

type (
webrtcConnectionHandler struct {
conn *webrtc.PeerConnection
slogger *slog.Logger
shutdown chan struct{}
}

webrtcRequest struct {
SessionDescription string `json:"client_session_description"`
}

webrtcResponse struct {
ClientSDP string `json:"client_sdp"`
LauncherSDP string `json:"launcher_sdp"`
}
)

func (ls *localServer) webrtcHandlerFunc(w http.ResponseWriter, r *http.Request) {
r, span := traces.StartHttpRequestSpan(r, "path", r.URL.Path)
defer span.End()

if r.Body == nil {
sendClientError(w, span, errors.New("webrtc request body is nil"))
return
}

var body webrtcRequest
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
sendClientError(w, span, fmt.Errorf("error unmarshaling request body: %w", err))
return
}

h, err := ls.newWebrtcHandler(body.SessionDescription)
if err != nil {
h.close()
sendClientError(w, span, fmt.Errorf("error creating webrtc handler: %w", err))
return
}

localSessionDescription, err := h.localDescription()
if err != nil {
h.close()
sendClientError(w, span, fmt.Errorf("error getting webrtc session description: %w", err))
return
}

// Set the conn handler on localserver so we can shut it down
ls.setWebrtcConn(h)

// TODO RM: Send localSessionDescription in callback -- for now, just logs
respBody := webrtcResponse{
ClientSDP: body.SessionDescription,
LauncherSDP: localSessionDescription,
}
ls.slogger.Log(r.Context(), slog.LevelInfo,
"got local session description",
"description", localSessionDescription,
"resp", respBody,
)

go h.run()
}

func (ls *localServer) newWebrtcHandler(sessionDescriptionRaw string) (*webrtcConnectionHandler, error) {
conn, err := webrtc.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, fmt.Errorf("creating peer connection: %w", err)
}

w := &webrtcConnectionHandler{
conn: conn,
slogger: ls.slogger.With("component", "webrtc_handler"),
shutdown: make(chan struct{}),
}

// Prepare our handlers
w.conn.OnConnectionStateChange(w.handleWebrtcConnectionStateChange)
w.conn.OnDataChannel(w.handleDataChannel)

// Extract and set remote description
remoteDescription, err := extractRemoteDescription(sessionDescriptionRaw)
if err != nil {
return nil, fmt.Errorf("extracting remote description from request: %w", err)
}
if err := w.conn.SetRemoteDescription(remoteDescription); err != nil {
return nil, fmt.Errorf("setting remote description: %w", err)
}

// Create local description
answer, err := w.conn.CreateAnswer(nil)
if err != nil {
return nil, fmt.Errorf("creating local description: %w", err)
}
if err := w.conn.SetLocalDescription(answer); err != nil {
return nil, fmt.Errorf("setting local description: %w", err)
}

return w, nil
}

func extractRemoteDescription(sessionDescriptionRaw string) (webrtc.SessionDescription, error) {
descriptionDecoded, err := base64.StdEncoding.DecodeString(sessionDescriptionRaw)
if err != nil {
return webrtc.SessionDescription{}, fmt.Errorf("decoding session description: %w", err)
}

var remoteDescription webrtc.SessionDescription
if err := json.Unmarshal(descriptionDecoded, &remoteDescription); err != nil {
return remoteDescription, fmt.Errorf("unmarshalling session description: %w", err)
}

return remoteDescription, nil
}

func (w *webrtcConnectionHandler) handleDataChannel(d *webrtc.DataChannel) {
d.OnOpen(func() {
w.slogger.Log(context.TODO(), slog.LevelInfo,
"data channel opened",
)
})

d.OnMessage(func(msg webrtc.DataChannelMessage) {
w.slogger.Log(context.TODO(), slog.LevelInfo,
"received message",
"message", string(msg.Data),
)
})
}

func (w *webrtcConnectionHandler) handleWebrtcConnectionStateChange(s webrtc.PeerConnectionState) {
w.slogger.Log(context.TODO(), slog.LevelInfo,
"peer connection state has changed",
"new_state", s.String(),
)

if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
w.shutdown <- struct{}{}
}
}

func (w *webrtcConnectionHandler) localDescription() (string, error) {
descriptionRaw, err := json.Marshal(w.conn.LocalDescription())
if err != nil {
return "", fmt.Errorf("marshalling local description: %w", err)
}

return base64.StdEncoding.EncodeToString(descriptionRaw), nil
}

func (w *webrtcConnectionHandler) run() {
<-w.shutdown
w.close()
}

func (w *webrtcConnectionHandler) close() {
w.slogger.Log(context.TODO(), slog.LevelInfo,
"shutting down",
)
w.conn.Close()
// TODO RM: This requires a refactor to be able to set ls.webrtcConn to nil after close
}
37 changes: 37 additions & 0 deletions ee/localserver/webrtc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package localserver

import (
"fmt"
"log/slog"
"testing"
"time"

"github.com/kolide/launcher/pkg/log/multislogger"
"github.com/kolide/launcher/pkg/threadsafebuffer"
"github.com/stretchr/testify/require"
)

func TestWebrtc(t *testing.T) {

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Function TestWebrtc missing the call to method parallel (paralleltest)

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (macos-latest)

Function TestWebrtc missing the call to method parallel

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

Function TestWebrtc missing the call to method parallel

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (windows-latest)

Function TestWebrtc missing the call to method parallel (paralleltest)

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Function TestWebrtc missing the call to method parallel

Check failure on line 14 in ee/localserver/webrtc_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Function TestWebrtc missing the call to method parallel (paralleltest)
// Paste from Fiddle https://jsfiddle.net/e41tgovp/
remoteSessionDescription := ""

var logBytes threadsafebuffer.ThreadSafeBuffer
slogger := multislogger.New(slog.NewJSONHandler(&logBytes, &slog.HandlerOptions{Level: slog.LevelDebug}))
ls := localServer{
slogger: slogger.Logger,
}

conn, err := ls.newWebrtcHandler(remoteSessionDescription)
require.NoError(t, err)

defer conn.close()

localSessionDescription, err := conn.localDescription()
require.NoError(t, err)

fmt.Println(localSessionDescription)

time.Sleep(1 * time.Minute)

fmt.Println(logBytes.String())
}
31 changes: 24 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/godbus/dbus/v5 v5.1.0
github.com/golang/protobuf v1.5.3
github.com/google/fscrypt v0.3.3
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.4.2
github.com/groob/plist v0.0.0-20190114192801-a99fbe489d03
github.com/knightsc/system_policy v1.1.1-0.20211029142728-5f4c0d5419cc
Expand All @@ -25,18 +25,18 @@ require (
github.com/pkg/errors v0.9.1
github.com/scjalliance/comshim v0.0.0-20190308082608-cf06d2532c4e
github.com/serenize/snaker v0.0.0-20171204205717-a683aaf2d516
github.com/stretchr/testify v1.8.4
github.com/stretchr/testify v1.9.0
github.com/theupdateframework/go-tuf v0.5.2
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.6
go.opencensus.io v0.24.0
golang.org/x/crypto v0.21.0
golang.org/x/crypto v0.22.0
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0
golang.org/x/image v0.10.0
golang.org/x/net v0.23.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.18.0
golang.org/x/sys v0.19.0
golang.org/x/text v0.14.0
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.58.3
Expand Down Expand Up @@ -68,6 +68,22 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/ice/v3 v3.0.7 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.6 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v3 v3.0.1 // indirect
github.com/pion/stun/v2 v2.0.0 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/transport/v3 v3.0.2 // indirect
github.com/pion/turn/v3 v3.0.3 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/samber/lo v1.38.1 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
Expand Down Expand Up @@ -100,13 +116,14 @@ require (
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pion/webrtc/v4 v4.0.0-beta.19
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/samber/slog-multi v1.0.2
github.com/secure-systems-lab/go-securesystemslib v0.5.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tevino/abool v1.2.0 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
Expand All @@ -116,7 +133,7 @@ require (
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
google.golang.org/protobuf v1.33.0 // indirect
google.golang.org/protobuf v1.33.0
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit efd58c5

Please sign in to comment.