Skip to content

Commit

Permalink
Refactor and rewrite EAAS code entirely, add example discovery.
Browse files Browse the repository at this point in the history
  • Loading branch information
icedream committed Mar 15, 2024
1 parent 684d581 commit cca6ffa
Show file tree
Hide file tree
Showing 30 changed files with 1,205 additions and 2,795 deletions.
13 changes: 8 additions & 5 deletions beat_info_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package stagelinq

import (
"net"

"github.com/icedream/go-stagelinq/internal/messages"
"github.com/icedream/go-stagelinq/internal/socket"
)

// BeatInfo represents a received BeatInfo message.
Expand All @@ -18,7 +21,7 @@ type BeatInfoConnection struct {
beatInfoC chan *BeatInfo
}

var beatInfoConnectionMessageSet = newDeviceConnMessageSet([]message{&beatEmitMessage{}})
var beatInfoConnectionMessageSet = newDeviceConnMessageSet([]messages.Message{&beatEmitMessage{}})

func NewBeatInfoConnection(conn net.Conn, token Token) (bic *BeatInfoConnection, err error) {
msgConn := newMessageConnection(conn, beatInfoConnectionMessageSet)
Expand All @@ -34,11 +37,11 @@ func NewBeatInfoConnection(conn net.Conn, token Token) (bic *BeatInfoConnection,

// perform in-protocol service request
msgConn.WriteMessage(&serviceAnnouncementMessage{
tokenPrefixedMessage: tokenPrefixedMessage{
Token: token,
TokenPrefixedMessage: messages.TokenPrefixedMessage{
Token: messages.Token(token),
},
Service: "BeatInfo",
Port: uint16(getPort(conn.LocalAddr())),
Port: uint16(socket.GetPortFromAddress(conn.LocalAddr())),
})

go func() {
Expand All @@ -51,7 +54,7 @@ func NewBeatInfoConnection(conn net.Conn, token Token) (bic *BeatInfoConnection,
close(beatInfoConn.beatInfoC)
}()
for {
var msg message
var msg messages.Message
msg, err = msgConn.ReadMessage()
if err != nil {
return
Expand Down
205 changes: 205 additions & 0 deletions cmd/storage-discover/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package main

import (
"context"
"crypto/ed25519"
"crypto/rand"
"crypto/x509"
"flag"
"io"
"log"
"os"
"time"

"github.com/google/uuid"
"github.com/icedream/go-stagelinq/eaas"
"github.com/icedream/go-stagelinq/eaas/proto/enginelibrary"
"github.com/icedream/go-stagelinq/eaas/proto/networktrust"
"github.com/rivo/tview"
)

const (
appName = "Icedream StagelinQ Receiver"
appVersion = "0.0.0"
timeout = 15 * time.Second
)

var (
grpcURL string
hostname string
key ed25519.PrivateKey
id uuid.UUID
)

func init() {
flag.StringVar(&grpcURL, "server", "", "GRPC URL of the remote Engine Library to connect to. If empty, will discover devices instead.")
flag.Parse()

var err error
hostname, err = os.Hostname()
if err != nil {
hostname = "eaas-demo"
}

if f, err := os.Open("eaas-key.bin"); err == nil {
defer f.Close()
keyBytes, err := io.ReadAll(f)
if err != nil {
panic(err)
}
readKey, err := x509.ParsePKCS8PrivateKey(keyBytes)
if err != nil {
panic(err)
}
if edkey, ok := readKey.(ed25519.PrivateKey); !ok {
panic("eaas-key.bin is not an ed25519 private key")
} else {
key = edkey
}
}
if key == nil {
_, priv, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
panic(err)
}
keyBytes, err := x509.MarshalPKCS8PrivateKey(priv)
if err != nil {
panic(err)
}
os.WriteFile("eaas-key.bin", keyBytes, 0o600)
key = priv
}

if f, err := os.Open("eaas-id.txt"); err == nil {
defer f.Close()
keyBytes, err := io.ReadAll(f)
if err != nil {
panic(err)
}
id, err = uuid.ParseBytes(keyBytes)
if err != nil {
panic(err)
}
}
if key == nil {
id, err = uuid.NewUUID()
if err != nil {
panic(err)
}
keyBytes, err := id.MarshalBinary()
if err != nil {
panic(err)
}
os.WriteFile("eaas-id.txt", keyBytes, 0o600)
}
}

type App struct {
*tview.Application
}

func main() {
if len(grpcURL) == 0 {
runDiscovery()
return
}

runEngineLibraryUI(grpcURL)
}

func runEngineLibraryUI(grpcURL string) {
ctx := context.Background()
connection, err := eaas.DialContext(ctx, grpcURL)
if err != nil {
panic(err)
}

// pk := string(key.Public().(ed25519.PublicKey))
pk := id.String()
log.Println("Waiting for approval on the other end...")
resp, err := connection.CreateTrust(ctx, &networktrust.CreateTrustRequest{
DeviceName: &hostname,
Ed25519Pk: &pk,
})
if err != nil {
panic(err)
}
switch {
case resp.GetGranted() != nil:
log.Println("Access granted")
case resp.GetBusy() != nil:
log.Fatal("Busy")
case resp.GetDenied() != nil:
log.Fatal("Access denied")
default:
panic("unexpected response")
}

getLibraryResp, err := connection.GetLibrary(ctx, &enginelibrary.GetLibraryRequest{})
if err != nil {
panic(err)
}
var pageSize uint32 = 100
for _, playlist := range getLibraryResp.GetPlaylists() {
log.Printf("Playlist %q (%q)", playlist.GetTitle(), playlist.GetListType())

getTracksResp, err := connection.GetTracks(ctx, &enginelibrary.GetTracksRequest{
PlaylistId: playlist.Id,
PageSize: &pageSize,
})
if err != nil {
panic(err)
}
for _, track := range getTracksResp.GetTracks() {
metadata := track.GetMetadata()
if metadata == nil {
continue
}
log.Printf("\tTrack %s", metadata.String())
}
}
}

func runDiscovery() {
listener, err := eaas.ListenWithConfiguration(&eaas.ListenerConfiguration{
DiscoveryTimeout: timeout,
})
if err != nil {
panic(err)
}
defer listener.Close()

listener.SendBeaconEvery(5 * time.Second)

deadline := time.After(timeout)
foundDevices := []*eaas.Device{}

log.Printf("Listening for devices for %s", timeout)

discoveryLoop:
for {
select {
case <-deadline:
break discoveryLoop
default:
device, err := listener.Discover(timeout)
if err != nil {
log.Printf("WARNING: %s", err.Error())
continue discoveryLoop
}
if device == nil {
continue
}
// check if we already found this device before
for _, foundDevice := range foundDevices {
if foundDevice.IsEqual(device) {
continue discoveryLoop
}
}
foundDevices = append(foundDevices, device)
log.Printf("%s %q %q", device.Hostname, device.URL, device.SoftwareVersion)
}
}

log.Printf("Found devices: %d", len(foundDevices))
}
24 changes: 2 additions & 22 deletions cmd/storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/icedream/go-stagelinq/eaas/proto/enginelibrary"
"github.com/icedream/go-stagelinq/eaas/proto/networktrust"
"golang.org/x/text/encoding/unicode"
"google.golang.org/grpc"
)

Expand All @@ -29,25 +28,6 @@ const (
timeout = 5 * time.Second
)

var (
eaasMagic = []byte{'E', 'A', 'A', 'S', 0x01, 0x00}
eaasResponseMagic = []byte{'E', 'A', 'A', 'S', 0x01, 0x01}
)

var networkStringEncoding = unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM)

func writeNetworkString(w io.Writer, v string) (err error) {
converted, err := networkStringEncoding.NewEncoder().Bytes([]byte(v))
if err != nil {
return
}
if err = binary.Write(w, binary.BigEndian, uint32(len(converted))); err != nil {
return
}
_, err = w.Write(converted)
return
}

func main() {
var token [16]byte
if _, err := rand.Read(token[:]); err != nil {
Expand Down Expand Up @@ -153,10 +133,10 @@ func main() {
msg := new(bytes.Buffer)
msg.Write(eaasResponseMagic)

Check failure on line 134 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.19)

undefined: eaasResponseMagic

Check failure on line 134 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.20)

undefined: eaasResponseMagic

Check failure on line 134 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.21)

undefined: eaasResponseMagic

Check failure on line 134 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.22)

undefined: eaasResponseMagic
msg.Write(token[:])
writeNetworkString(msg, hostname)
messages.WriteNetworkString(msg, hostname)

Check failure on line 136 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.19)

undefined: messages

Check failure on line 136 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.20)

undefined: messages

Check failure on line 136 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.21)

undefined: messages

Check failure on line 136 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.22)

undefined: messages
uri := fmt.Sprintf("grpc://%s:%d", "192.168.188.120", 50010)
binary.Write(msg, binary.BigEndian, uint32(len(uri)))
writeNetworkString(msg, appVersion)
messages.WriteNetworkString(msg, appVersion)

Check failure on line 139 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.19)

undefined: messages

Check failure on line 139 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.20)

undefined: messages

Check failure on line 139 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.21)

undefined: messages

Check failure on line 139 in cmd/storage/main.go

View workflow job for this annotation

GitHub Actions / build (1.22)

undefined: messages
msg.Write([]byte{0, 0, 0, 2, 0, 0x5f}) // TODO
b := msg.Bytes()
log.Println("Sending UDP beacon\n", hex.Dump(b))
Expand Down
6 changes: 4 additions & 2 deletions device.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package stagelinq

import "net"
import (
"net"
)

// DeviceState represents a device's state in the network.
// Possible values are DevicePresent and DeviceLeaving.
Expand Down Expand Up @@ -59,7 +61,7 @@ func (device *Device) IsEqual(anotherDevice *Device) bool {
func newDeviceFromDiscovery(addr *net.UDPAddr, msg *discoveryMessage) *Device {
return &Device{
port: msg.Port,
token: msg.Token,
token: Token(msg.Token),

IP: addr.IP,
Name: msg.Source,
Expand Down
Loading

0 comments on commit cca6ffa

Please sign in to comment.