Skip to content

Commit

Permalink
Merge branch 'main' into tests-worker-selection-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
restevens402 committed Nov 12, 2024
2 parents cca860d + c58fe2a commit 2c5f819
Show file tree
Hide file tree
Showing 42 changed files with 408 additions and 424 deletions.
15 changes: 10 additions & 5 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
- '**'
jobs:
test:

runs-on: ubuntu-latest

steps:
Expand All @@ -20,12 +21,16 @@ jobs:
with:
go-version: '^1.22'

- name: Install dependencies
run: go mod tidy
- name: Install golangci-lint
run: sudo snap install golangci-lint --classic

- name: Code formatting and linting
run: make ci-lint

- name: Run tests
run: |
make test
run: make test

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ snippets.txt
# Build result of goreleaser
dist/
bp-todo.md

# Result of running tests with coverage
coverage.txt
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ build: contracts/node_modules

install:
@sh ./node_install.sh

run: build
@./bin/masa-node

Expand All @@ -44,16 +44,25 @@ stake: build
client: build
@./bin/masa-node-cli

# TODO: Add -race and fix race conditions
test: contracts/node_modules
@go test -coverprofile=coverage.txt -covermode=atomic -v ./...
@go test -coverprofile=coverage.txt -covermode=atomic -v -count=1 -shuffle=on ./...

ci-lint:
go mod tidy && git diff --exit-code
go mod download
go mod verify
gofmt -s -w . && git diff --exit-code
go vet ./...
golangci-lint run

clean:
@rm -rf bin

@if [ -d ~/.masa/blocks ]; then rm -rf ~/.masa/blocks; fi
@if [ -d ~/.masa/cache ]; then rm -rf ~/.masa/cache; fi
@if [ -f masa_node.log ]; then rm masa_node.log; fi

proto:
sh pkg/workers/messages/build.sh

Expand Down
76 changes: 42 additions & 34 deletions cmd/masa-node-cli/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -43,21 +42,6 @@ func handleIPAddress(multiAddr string) string {
return ""
}

// handleOpenFile reads the content of a file specified by the filename 'f' and returns it as a string.
// If the file cannot be read, the function logs a fatal error and exits the program.
// Parameters:
// - f: The name of the file to read.
// Returns:
// - A string containing the content of the file.
// func handleOpenFile(f string) string {
// dat, err := os.ReadFile(f)
// if err != nil {
// log.Print(err)
// return ""
// }
// return string(dat)
// }

// handleSaveFile writes the provided content to a file specified by the filename 'f'.
// It appends the content to the file if it already exists, or creates a new file with the content if it does not.
// The file is created with permissions set to 0755.
Expand All @@ -66,9 +50,14 @@ func handleIPAddress(multiAddr string) string {
// - content: The content to write to the file.
func handleSaveFile(f string, content string) {
file, err := os.OpenFile(f, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
file.WriteString(content + "\n")
if err != nil {
log.Println(err)
logrus.Errorf("[-] Error while opening file %s for writing: %v", f, err)
return
}

_, err = file.WriteString(content + "\n")
if err != nil {
logrus.Errorf("[-] Error while writing to file %s: %v", f, err)
return
}
}
Expand All @@ -84,7 +73,7 @@ func handleSaveFile(f string, content string) {
func handleGPT(prompt string, userMessage string) (string, error) {
key := os.Getenv("OPENAI_API_KEY")
if key == "" {
log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.")
logrus.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.")
return "", errors.New("OPENAI_API_KEY is not set")
}
client := openai.NewClient(key)
Expand All @@ -105,7 +94,7 @@ func handleGPT(prompt string, userMessage string) (string, error) {
},
)
if err != nil {
log.Print(err)
logrus.Errorf("[-] Error while getting ChatGPT completion: %v", err)
return "", err
}
return resp.Choices[0].Message.Content, nil
Expand All @@ -131,7 +120,7 @@ func handleSpeak(response string) {

req, err := http.NewRequest(http.MethodPost, os.Getenv("ELAB_URL"), bytes.NewBuffer(buf))
if err != nil {
log.Print(err)
logrus.Errorf("[-] Error while creating HTTP request to %s: %v", os.Getenv("ELAB_URL"), err)
return
}
req.Header.Set("accept", "*/*")
Expand All @@ -140,27 +129,47 @@ func handleSpeak(response string) {

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Print(err)
logrus.Errorf("[-] Error while sending HTTP POST to %s: %v", os.Getenv("ELAB_URL"), err)
return
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Print(err)
logrus.Errorf("[-] Error while reading HTTP reply from ElevenLabs API: %v", err)
return
}

// TODO: Configure filename
file, err := os.Create("output.mp3")
file.Write(bodyBytes)
if err != nil {
log.Print(err)
logrus.Errorf("[-] Error while opening output.mp3 for writing: %v", err)
return
}

_, err = file.Write(bodyBytes)
if err != nil {
logrus.Errorf("[-] Error while writing voice data to output.mp3: %v", err)
return
} else {
cmd := exec.Command("afplay", "output.mp3")
go cmd.Run()
go handleTranscribe("output.mp3", "transcription.txt")
}

// TODO: Is afplay available in all platforms? Perhaps configure?
cmd := exec.Command("afplay", "output.mp3")
go func() {
err := cmd.Run()
if err != nil {
logrus.Errorf("[-] Error while playing output using %s: %v", cmd, err)
}
}()
go func() {
err := handleTranscribe("output.mp3", "transcription.txt")
if err != nil {
logrus.Errorf("[-] Error while transcribing audio: %v", err)
}
}()

// TODO: perhaps rm output.mp3?
}
}

Expand All @@ -169,22 +178,21 @@ func handleSpeak(response string) {
func handleTranscribe(audioFile string, txtFile string) error {
key := os.Getenv("OPENAI_API_KEY")
if key == "" {
log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.")
return errors.New("OPENAI_API_KEY is not set")
return errors.New("OPENAI_API_KEY is not set. Please set the environment variable and try again.")
}
client := openai.NewClient(key)
ctx := context.Background()
req := openai.AudioRequest{
Model: openai.Whisper1,
FilePath: audioFile,
}

resp, err := client.CreateTranscription(ctx, req)
if err != nil {
fmt.Printf("Transcription error: %v\n", err)
return err
} else {
handleSaveFile(txtFile, resp.Text)
}

handleSaveFile(txtFile, resp.Text)
return nil
}

Expand Down
24 changes: 18 additions & 6 deletions cmd/masa-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@ package main
import (
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/workers"
)

func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) {
func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) {
// WorkerManager configuration
// XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons
workerManagerOptions := []workers.WorkerOptionFunc{}
// TODO: this needs to be moved under config, but now it's here as there are import cycles given singletons
workerManagerOptions := []workers.WorkerOptionFunc{
workers.WithLlmChatUrl(cfg.LLMChatUrl),
workers.WithMasaDir(cfg.MasaDir),
}

cachePath := cfg.CachePath
if cachePath == "" {
cachePath = cfg.MasaDir + "/cache"
}

masaNodeOptions := []node.Option{
node.EnableStaked,
Expand All @@ -19,6 +28,10 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana
node.WithVersion(cfg.Version),
node.WithPort(cfg.PortNbr),
node.WithBootNodes(cfg.Bootnodes...),
node.WithMasaDir(cfg.MasaDir),
node.WithCachePath(cachePath),
node.WithLLMCloudFlareURL(cfg.LLMCfUrl),
node.WithKeyManager(keyManager),
}

if cfg.TwitterScraper {
Expand Down Expand Up @@ -50,8 +63,7 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana
blockChainEventTracker := node.NewBlockChain()
pubKeySub := &pubsub.PublicKeySubscriptionHandler{}

// TODO: Where the config is involved, move to the config the generation of
// Node options
// TODO: Where the config is involved, move to the config the generation of Node options
masaNodeOptions = append(masaNodeOptions, []node.Option{
// Register the worker manager
node.WithMasaProtocolHandler(
Expand All @@ -68,7 +80,7 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana
// and other peers can do work we only need to check this here
// if this peer can or cannot scrape or write that is checked in other places
masaNodeOptions = append(masaNodeOptions,
node.WithService(blockChainEventTracker.Start(config.GetInstance().MasaDir)),
node.WithService(blockChainEventTracker.Start(cfg.MasaDir)),
)
}

Expand Down
25 changes: 14 additions & 11 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ func main() {
cfg.LogConfig()
cfg.SetupLogging()

keyManager := masacrypto.KeyManagerInstance()
keyManager, err := masacrypto.NewKeyManager(cfg.PrivateKey, cfg.PrivateKeyFile)
if err != nil {
logrus.Fatal("[-] Failed to initialize keys:", err)
}

// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())

if cfg.Faucet {
err := handleFaucet(keyManager.EcdsaPrivKey)
err := handleFaucet(cfg.RpcUrl, keyManager.EcdsaPrivKey)
if err != nil {
logrus.Errorf("[-] %v", err)
os.Exit(1)
Expand All @@ -51,7 +54,7 @@ func main() {
}

if cfg.StakeAmount != "" {
err := handleStaking(keyManager.EcdsaPrivKey)
err := handleStaking(cfg.RpcUrl, keyManager.EcdsaPrivKey, cfg.StakeAmount)
if err != nil {
logrus.Warningf("%v", err)
} else {
Expand All @@ -61,7 +64,7 @@ func main() {
}

// Verify the staking event
isStaked, err := staking.VerifyStakingEvent(keyManager.EthAddress)
isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, keyManager.EthAddress)
if err != nil {
logrus.Error(err)
}
Expand All @@ -70,18 +73,15 @@ func main() {
logrus.Warn("No staking event found for this address")
}

isValidator := cfg.Validator

masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg)
masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg, keyManager)
// Create a new OracleNode
masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...)

if err != nil {
logrus.Fatal(err)
}

err = masaNode.Start()
if err != nil {
if err = masaNode.Start(); err != nil {
logrus.Fatal(err)
}

Expand All @@ -98,7 +98,7 @@ func main() {
}

// Init cache resolver
db.InitResolverCache(masaNode, keyManager)
db.InitResolverCache(masaNode, keyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator)

// Cancel the context when SIGINT is received
go handleSignals(cancel, masaNode, cfg)
Expand All @@ -118,8 +118,11 @@ func main() {
// Get the multiaddress and IP address of the node
multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress
ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address
if err != nil {
logrus.Errorf("[-] Error while getting node IP address from %v: %v", multiAddr, err)
}
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)

<-ctx.Done()
}
Expand Down
Loading

0 comments on commit 2c5f819

Please sign in to comment.