Skip to content

Commit

Permalink
Add ONVIF backchannel functionality with G711 encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricve committed Nov 17, 2023
1 parent 324fffd commit dd7fcb3
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 5 deletions.
5 changes: 3 additions & 2 deletions machinery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery

go 1.19

//replace github.com/kerberos-io/joy4 v1.0.58 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.60 => ../../../../github.com/kerberos-io/joy4

// replace github.com/kerberos-io/onvif v0.0.6 => ../../../../github.com/kerberos-io/onvif

Expand All @@ -26,7 +26,7 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.60
github.com/kerberos-io/joy4 v1.0.62
github.com/kerberos-io/onvif v0.0.7
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
Expand All @@ -38,6 +38,7 @@ require (
github.com/swaggo/gin-swagger v1.5.3
github.com/swaggo/swag v1.8.9
github.com/tevino/abool v1.2.0
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359
go.mongodb.org/mongo-driver v1.7.5
gopkg.in/DataDog/dd-trace-go.v1 v1.46.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
6 changes: 4 additions & 2 deletions machinery/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.60 h1:W9LMTHw+Lgz4J9/28xCvvVebhcAioup49NqxYVmrH38=
github.com/kerberos-io/joy4 v1.0.60/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.62 h1:LsjGrss5I2UGfTovAF0icTuEcxwOPptkSqGyxeIwa40=
github.com/kerberos-io/joy4 v1.0.62/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.7 h1:LIrXjTH7G2W9DN69xZeJSB0uS3W1+C3huFO8kTqx7/A=
github.com/kerberos-io/onvif v0.0.7/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -471,6 +471,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359 h1:P9yeMx2iNJxJqXEwLtMjSwWcD2a0AlFmFByeosMZhLM=
github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359/go.mod h1:ySLGJD8AQluMQuu5JDvfJrwsBra+8iX1jFsKS8KfB2I=
github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.mongodb.org/mongo-driver v1.7.5 h1:ny3p0reEpgsR2cfA5cjgwFZg3Cv/ofFh/8jbhGtz9VI=
Expand Down
8 changes: 7 additions & 1 deletion machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ loop:
cameraConnected = "false"
}

hasBackChannel := "false"
if communication.HasBackChannel {
hasBackChannel = "true"
}

// We will formated the uptime to a human readable format
// this will be used on Kerberos Hub: Uptime -> 1 day and 2 hours.
uptimeFormatted := uptimeStart.Format("2006-01-02 15:04:05")
Expand Down Expand Up @@ -382,13 +387,14 @@ loop:
"onvif_presets": "%s",
"onvif_presets_list": %s,
"cameraConnected": "%s",
"hasBackChannel": "%s",
"numberoffiles" : "33",
"timestamp" : 1564747908,
"cameratype" : "IPCamera",
"docker" : true,
"kios" : false,
"raspberrypi" : false
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected)
}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected, hasBackChannel)

var jsonStr = []byte(object)
buffy := bytes.NewBuffer(jsonStr)
Expand Down
80 changes: 80 additions & 0 deletions machinery/src/components/Audio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package components

import (
"bufio"
"fmt"
"os"
"time"

"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/joy4/av"
"github.com/zaf/g711"
)

func GetBackChannelAudioCodec(streams []av.CodecData, communication *models.Communication) av.AudioCodecData {
for _, stream := range streams {
if stream.Type().IsAudio() {
if stream.Type().String() == "PCM_MULAW" {
pcmuCodec := stream.(av.AudioCodecData)
if pcmuCodec.IsBackChannel() {
communication.HasBackChannel = true
return pcmuCodec
}
}
}
}
return nil
}

func WriteAudioToBackchannel(infile av.DemuxCloser, streams []av.CodecData, communication *models.Communication) {
log.Log.Info("WriteAudioToBackchannel: looking for backchannel audio codec")

pcmuCodec := GetBackChannelAudioCodec(streams, communication)
if pcmuCodec != nil {
log.Log.Info("WriteAudioToBackchannel: found backchannel audio codec")

length := 0
channel := pcmuCodec.GetIndex() * 2 // This is the same calculation as Interleaved property in the SDP file.
for audio := range communication.HandleAudio {
// Encode PCM to MULAW
var bufferUlaw []byte
for _, v := range audio.Data {
b := g711.EncodeUlawFrame(v)
bufferUlaw = append(bufferUlaw, b)
}
infile.Write(bufferUlaw, channel, uint32(length))
length = (length + len(bufferUlaw)) % 65536
time.Sleep(128 * time.Millisecond)
}
}
log.Log.Info("WriteAudioToBackchannel: finished")

}

func WriteFileToBackChannel(infile av.DemuxCloser) {
// Do the warmup!
file, err := os.Open("./audiofile.bye")
if err != nil {
fmt.Println("WriteFileToBackChannel: error opening audiofile.bye file")
}
defer file.Close()

// Read file into buffer
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)

count := 0
for {
_, err := reader.Read(buffer)
if err != nil {
break
}
// Send to backchannel
fmt.Println(buffer)
infile.Write(buffer, 2, uint32(count))

count = count + 1024
time.Sleep(128 * time.Millisecond)
}
}
11 changes: 11 additions & 0 deletions machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/kerberos-io/joy4/cgo/ffmpeg"

//"github.com/youpy/go-wav"

"github.com/kerberos-io/agent/machinery/src/capture"
"github.com/kerberos-io/agent/machinery/src/cloud"
"github.com/kerberos-io/agent/machinery/src/computervision"
Expand Down Expand Up @@ -244,6 +246,9 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
go capture.HandleSubStream(subInfile, subQueue, communication)
}

// Handle processing of audio
communication.HandleAudio = make(chan models.AudioDataPartial)

// Handle processing of motion
communication.HandleMotion = make(chan models.MotionDataPartial, 1)
if subStreamEnabled {
Expand Down Expand Up @@ -285,6 +290,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
// If we reach this point, we have a working RTSP connection.
communication.CameraConnected = true

// We might have a camera with audio backchannel enabled.
// Check if we have a stream with a backchannel and is PCMU encoded.
go WriteAudioToBackchannel(infile, streams, communication)

// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
// This will go into a blocking state, once this channel is triggered
// the agent will cleanup and restart.
Expand Down Expand Up @@ -328,6 +337,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
}
close(communication.HandleMotion)
communication.HandleMotion = nil
close(communication.HandleAudio)
communication.HandleAudio = nil

// Waiting for some seconds to make sure everything is properly closed.
log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
Expand Down
6 changes: 6 additions & 0 deletions machinery/src/models/AudioData.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package models

type AudioDataPartial struct {
Timestamp int64 `json:"timestamp" bson:"timestamp"`
Data []int16 `json:"data" bson:"data"`
}
2 changes: 2 additions & 0 deletions machinery/src/models/Communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Communication struct {
HandleStream chan string
HandleSubStream chan string
HandleMotion chan MotionDataPartial
HandleAudio chan AudioDataPartial
HandleUpload chan string
HandleHeartBeat chan string
HandleLiveSD chan int64
Expand All @@ -38,4 +39,5 @@ type Communication struct {
SubDecoder *ffmpeg.VideoDecoder
Image string
CameraConnected bool
HasBackChannel bool
}
6 changes: 6 additions & 0 deletions machinery/src/models/MQTT.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ type Payload struct {
Value map[string]interface{} `json:"value"`
}

// We received a audio input
type AudioPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
Data []int16 `json:"data"`
}

// We received a recording request, we'll send it to the motion handler.
type RecordPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
Expand Down
19 changes: 19 additions & 0 deletions machinery/src/routers/mqtt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
switch payload.Action {
case "record":
go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
case "get-audio-backchannel":
go HandleAudio(mqttClient, hubKey, payload, configuration, communication)
case "get-ptz-position":
go HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
case "update-ptz-position":
Expand Down Expand Up @@ -268,6 +270,23 @@ func HandleRecording(mqttClient mqtt.Client, hubKey string, payload models.Paylo
}
}

func HandleAudio(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value

// Convert map[string]interface{} to AudioPayload
jsonData, _ := json.Marshal(value)
var audioPayload models.AudioPayload
json.Unmarshal(jsonData, &audioPayload)

if audioPayload.Timestamp != 0 {
audioDataPartial := models.AudioDataPartial{
Timestamp: audioPayload.Timestamp,
Data: audioPayload.Data,
}
communication.HandleAudio <- audioDataPartial
}
}

func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
value := payload.Value

Expand Down

0 comments on commit dd7fcb3

Please sign in to comment.