Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: add config generation #822

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ RUN curl -o /usr/bin/minio https://dl.min.io/server/minio/release/linux-$TARGETA
&& mc --version

COPY ./scripts /usr/local/bin
COPY ./config/full-stack.json /etc/livepeer/full-stack.json

ENV CATALYST_DOWNLOADER_PATH=/usr/local/bin \
CATALYST_DOWNLOADER_MANIFEST=/etc/livepeer/manifest.yaml \
Expand All @@ -134,7 +133,7 @@ ENV CATALYST_DOWNLOADER_PATH=/usr/local/bin \

RUN mkdir /data

CMD ["/usr/local/bin/catalyst", "--", "/usr/local/bin/MistController", "-c", "/etc/livepeer/full-stack.json"]
CMD ["/usr/local/bin/catalyst"]

FROM ${FROM_LOCAL_PARENT} AS box-local

Expand Down
13 changes: 12 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,16 @@ scripts:
cp -Rv ./scripts/* ./bin

.PHONY: box-dev
box-dev: scripts
box-dev: scripts catalyst
ulimit -c unlimited \
&& exec docker run \
-v $$(realpath bin):/usr/local/bin \
-v $$(realpath data):/data \
-v $$(realpath config):/etc/livepeer:ro \
-v $$(realpath ./coredumps):$$(realpath ./coredumps) \
-e CORE_DUMP_DIR=$$(realpath ./coredumps) \
-v /home/iameli/.ethereum/keystore:/keystore \
-e CATALYST_SECRET=f61b3cdb-d173-4a7a-a0d3-547b871a56f9 \
$(shell for line in $$(cat .env 2>/dev/null || echo ''); do printf -- "-e $$line "; done) \
--rm \
-it \
Expand Down Expand Up @@ -246,3 +248,12 @@ snapshot:
&& cd data \
&& rm -rf cockroach/auxiliary/EMERGENCY_BALLAST \
&& tar czvf ../livepeer-studio-bootstrap.tar.gz cockroach

.PHONY: sql-schema-dump
sql-schema-dump:
cockroach sql --format=raw \
--url 'postgresql://root@localhost:5432/defaultdb?sslmode=disable' \
--execute "show create all tables" \
| grep -v '#' \
| grep -v 'Time' \
> ./config/full-stack.sql
77 changes: 61 additions & 16 deletions cmd/catalyst/catalyst.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package main

import (
"flag"
"os"
"os/exec"
"syscall"

"github.com/livepeer/catalyst/cmd/downloader/cli"
"github.com/livepeer/catalyst/cmd/downloader/downloader"
"github.com/livepeer/catalyst/cmd/downloader/types"
glog "github.com/magicsong/color-glog"
"github.com/golang/glog"
"github.com/livepeer/catalyst/cmd/catalyst/config"
"github.com/livepeer/catalyst/cmd/downloader/constants"
"github.com/peterbourgon/ff/v3"
)

var Version = "undefined"
Expand All @@ -16,26 +18,69 @@ var Version = "undefined"
// but that one will stay just a downloader and this binary may gain other functionality

func main() {
cliFlags, err := cli.GetCliFlags(types.BuildFlags{Version: Version})
cli := config.Cli{}
flag.Set("logtostderr", "true")
// vFlag := flag.Lookup("v")
fs := flag.NewFlagSet(constants.AppName, flag.ExitOnError)
// fs.StringVar(&cli.Verbosity, "v", "3", "Log verbosity. Integer value from 0 to 9")
fs.StringVar(&cli.PublicURL, "public-url", "http://localhost:8888", "Public-facing URL of your Catalyst node, including protocol and port")
fs.StringVar(&cli.Secret, "secret", "", "Secret UUID to secure your Catalyst node")
fs.StringVar(&cli.ConfOutput, "conf-output", "/tmp/catalyst-generated.json", "Path where we will place generated MistServer configuration")
fs.StringVar(&cli.SQLOutput, "sql-output", "/tmp/catalyst-fixtures.sql", "Path where we will generate SQL fixtures")
fs.StringVar(&cli.Network, "network", "offchain", "Network to use for transcoding. Allowed values: offchain, arbitrum-one-mainnet")
fs.StringVar(&cli.EthURL, "eth-url", "", "HTTPS URL of an Ethereum RPC provider for your selected network")
fs.StringVar(&cli.EthKeystorePath, "eth-keystore-path", "/keystore", "Path to an Ethereum keystore")
fs.StringVar(&cli.EthPassword, "eth-password", "", "Ethereum password or path to password file")
fs.StringVar(&cli.MaxTicketEV, "max-ticket-ev", "50000000001", "The maximum acceptable expected value for one PM ticket")
fs.StringVar(&cli.MaxTotalEV, "max-total-ev", "20000000000000", "The maximum acceptable expected value for one PM payment")
fs.StringVar(&cli.MaxPricePerUnit, "max-price-per-unit", "700", "The maximum transcoding price (in wei) per 'pixelsPerUnit' a broadcaster is willing to accept. If not set explicitly, broadcaster is willing to accept ANY price")

ff.Parse(
fs, os.Args[1:],
ff.WithConfigFileFlag("config"),
ff.WithConfigFileParser(ff.PlainParser),
ff.WithEnvVarPrefix("CATALYST"),
ff.WithEnvVarSplit(","),
)
flag.CommandLine.Parse(nil)
conf, sql, err := config.GenerateConfig(&cli)
if err != nil {
panic(err)
}
err = os.WriteFile(cli.ConfOutput, conf, 0600)
if err != nil {
glog.Fatalf("error parsing cli flags: %s", err)
return
panic(err)
}
err = downloader.Run(cliFlags)
err = os.WriteFile(cli.SQLOutput, sql, 0600)
if err != nil {
glog.Fatalf("error running downloader: %s", err)
panic(err)
}
execNext(cliFlags)
execNext(cli)
}

// Archiving for when we want to introduce auto-updating:

// func main() {
// cliFlags, err := cli.GetCliFlags(types.BuildFlags{Version: Version})
// if err != nil {
// glog.Fatalf("error parsing cli flags: %s", err)
// return
// }
// err = downloader.Run(cliFlags)
// if err != nil {
// glog.Fatalf("error running downloader: %s", err)
// }
// execNext(cliFlags)
// }

// Done! Move on to the provided next application, if it exists.
func execNext(cliFlags types.CliFlags) {
if len(cliFlags.ExecCommand) == 0 {
// Nothing to do.
return
func execNext(cli config.Cli) {
fname, err := exec.LookPath("MistController")
if err != nil {
glog.Fatalf("error finding MistController: %s", fname)
}
glog.Infof("downloader complete, now we will exec %v", cliFlags.ExecCommand)
execErr := syscall.Exec(cliFlags.ExecCommand[0], cliFlags.ExecCommand, os.Environ())
glog.Infof("config file written, now we will exec MistController")
execErr := syscall.Exec(fname, []string{"MistController", "-c", cli.ConfOutput}, os.Environ())
if execErr != nil {
glog.Fatalf("error running next command: %s", execErr)
}
Expand Down
223 changes: 223 additions & 0 deletions cmd/catalyst/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package config

import (
_ "embed"
"encoding/json"
"fmt"
"net/url"
"strings"

"github.com/doug-martin/goqu/v9"
)

//go:embed full-stack.json
var fullstack []byte

//go:embed full-stack.sql
var sqlTables string

var adminID = "00000000-0000-4000-0000-000000000000"
var recordingBucketID = "00000000-0000-4000-0000-000000000001"
var vodBucketID = "00000000-0000-4000-0000-000000000002"
var vodBucketCatalystID = "00000000-0000-4000-0000-000000000003"
var privateBucketID = "00000000-0000-4000-0000-000000000004"

type Cli struct {
PublicURL string
Secret string
Verbosity string
ConfOutput string
SQLOutput string
Network string
EthURL string
EthKeystorePath string
EthPassword string
MaxTicketEV string
MaxTotalEV string
MaxPricePerUnit string
}

type DBObject map[string]any

func (d DBObject) Table() string {
switch d["kind"] {
case "user":
return "users"
case "api-token":
return "api_token"
case "object-store":
return "object_store"
}
panic("table not found")
}

func GenerateConfig(cli *Cli) ([]byte, []byte, error) {
if cli.Secret == "" {
return []byte{}, []byte{}, fmt.Errorf("CATALYST_SECRET parameter is required")
}
u, err := url.Parse(cli.PublicURL)
if err != nil {
return []byte{}, []byte{}, err
}
var conf MistConfig
err = json.Unmarshal(fullstack, &conf)
if err != nil {
return []byte{}, []byte{}, err
}

inserts := []DBObject{}

admin := DBObject{
"id": adminID,
"firstName": "Root",
"lastName": "User",
"admin": true,
"createdAt": 0,
"email": "[email protected]",
"emailValid": true,
"emailValidToken": "00000000-0000-4000-0000-000000000000",
"kind": "user",
"lastSeen": 1694546853946,
"password": "0000000000000000000000000000000000000000000000000000000000000000",
"salt": "0000000000000000",
}
apiToken := DBObject{
"name": "ROOT KEY DON'T DELETE",
"createdAt": 0,
"id": cli.Secret,
"kind": "api-token",
"userId": admin["id"],
}
inserts = append(inserts, admin, apiToken)

recordingBucket := ObjectStore(adminID, cli.PublicURL, recordingBucketID, "os-recordings")

vodBucket := ObjectStore(adminID, cli.PublicURL, vodBucketID, "os-vod")

vodBucketCatalyst := ObjectStore(adminID, cli.PublicURL, vodBucketCatalystID, "os-catalyst-vod")

privateBucket := ObjectStore(adminID, cli.PublicURL, privateBucketID, "os-vod")
inserts = append(inserts, recordingBucket, vodBucket, vodBucketCatalyst, privateBucket)

newProtocols := []*Protocol{}
for _, protocol := range conf.Config.Protocols {
ok := tweakProtocol(protocol, cli, u)
if ok {
newProtocols = append(newProtocols, protocol)
}
}
conf.Config.Protocols = newProtocols

video := conf.Streams["video"]
for _, process := range video.Processes {
if process.Process == "Livepeer" {
process.AccessToken = cli.Secret
}
}

var out []byte
out, err = json.MarshalIndent(conf, "", " ")
if err != nil {
return []byte{}, []byte{}, err
}

sql := strings.ReplaceAll(sqlTables, "CREATE TABLE", "CREATE TABLE IF NOT EXISTS")

for _, insert := range inserts {
obj, err := json.Marshal(insert)
if err != nil {
return []byte{}, []byte{}, err
}
ds := goqu.Insert(insert.Table()).Rows(
goqu.Record{"id": insert["id"], "data": obj},
).OnConflict(goqu.DoNothing())
insertSQL, _, err := ds.ToSQL()
if err != nil {
return []byte{}, []byte{}, err
}

sql = fmt.Sprintf("%s\n%s;", sql, insertSQL)
}

return out, []byte(sql), nil
}

// returns true if this protocol should be included
func tweakProtocol(protocol *Protocol, cli *Cli, u *url.URL) bool {
if protocol.Connector == "livepeer-api" && !protocol.StreamInfoService {
protocol.RecordCatalystObjectStoreID = recordingBucketID
protocol.VODCatalystObjectStoreID = vodBucketCatalystID
protocol.VODCatalystPrivateAssetsObjectStore = privateBucketID
protocol.VODObjectStoreID = vodBucketID
protocol.CORSJWTAllowlist = fmt.Sprintf(`["%s"]`, cli.PublicURL)
protocol.Ingest = fmt.Sprintf(
`[{"ingest":"rtmp://%s/live","ingests":{"rtmp":"rtmp://%s/live","srt":"srt://%s:8889"},"playback":"%s/mist/hls","base":"%s","origin":"%s"}]`,
u.Hostname(),
u.Hostname(),
u.Hostname(),
cli.PublicURL,
cli.PublicURL,
cli.PublicURL,
)
} else if protocol.Connector == "livepeer-catalyst-api" {
protocol.APIToken = cli.Secret
protocol.Tags = fmt.Sprintf("node=media,http=%s/mist,https=%s/mist", cli.PublicURL, cli.PublicURL)
} else if protocol.Connector == "livepeer-task-runner" {
protocol.CatalystSecret = cli.Secret
protocol.LivepeerAccessToken = cli.Secret
} else if protocol.Connector == "livepeer-analyzer" {
protocol.LivepeerAccessToken = cli.Secret
} else if protocol.Connector == "livepeer" && protocol.Broadcaster {
// both broadcasters
if cli.Network != "offchain" {
protocol.Network = cli.Network
protocol.EthKeystorePath = cli.EthKeystorePath
protocol.EthPassword = cli.EthPassword
protocol.EthURL = cli.EthURL
protocol.MaxPricePerUnit = cli.MaxPricePerUnit
protocol.MaxTicketEV = cli.MaxTicketEV
protocol.MaxTotalEV = cli.MaxTotalEV
} else {
protocol.OrchAddr = "http://127.0.0.1:8936"
}
} else if protocol.Connector == "livepeer" && protocol.Broadcaster && protocol.MetadataQueueURI != "" {
// live broadcaster
protocol.AuthWebhookURL = fmt.Sprintf("http://%s:%[email protected]:3004/api/stream/hook", adminID, cli.Secret)
} else if protocol.Connector == "livepeer" && protocol.Orchestrator {
// if we're not offchain we shouldn't run a local O
if cli.Network != "offchain" {
return false
}
} else if protocol.Connector == "WebRTC" {
protocol.ICEServers = []ICEServer{
{
URLs: fmt.Sprintf("stun:%s:3478", u.Hostname()),
},
{
Credential: "livepeer",
URLs: fmt.Sprintf("turn:%s:3478", u.Hostname()),
Username: "livepeer",
},
{
URLs: fmt.Sprintf("stun:%s:5349", u.Hostname()),
},
{
Credential: "livepeer",
URLs: fmt.Sprintf("turn:%s:5349", u.Hostname()),
Username: "livepeer",
},
}
}
return true
}

func ObjectStore(userID, publicURL, id, bucket string) DBObject {
return DBObject{
"createdAt": 0,
"id": id,
"publicUrl": fmt.Sprintf("%s/%s", publicURL, bucket),
"url": fmt.Sprintf("s3+http://admin:[email protected]:9000/%s", bucket),
"userId": userID,
"kind": "object-store",
}
}
Loading
Loading