Skip to content

Commit

Permalink
fix: multiaddress not constructed on startup (#524)
Browse files Browse the repository at this point in the history
* Refactor network address handling and improve IP retrieval

Restructure how multiaddresses and IP addresses are handled within the codebase to improve maintainability and efficiency. Added logic for obtaining GCP external IP addresses and restructured public address retrieval. Moved HTTP client functionality to a new package and updated references accordingly.

* Update max remote workers

* Improve worker selection resilience

- Handle invalid multiaddresses gracefully
- Continue searching for eligible workers on errors
- Add more detailed logging for debugging
- Prevent potential nil pointer dereference
- Log warning if no workers found

* Remove IsActive and timeout checks from CanDoWork method

This commit simplifies the worker eligibility criteria in the CanDoWork method
of the NodeData struct. The following changes were made:

- Removed the check for node active status (IsActive)
- Removed the worker timeout check
- Retained the check for staked status (IsStaked)

The method now only considers if a node is staked and configured for the
specific worker type when determining eligibility. This change allows for
more inclusive worker participation, as nodes are no longer excluded based
on active status or timeout conditions.

* extend context deadline timeout for a connection attempt

* Add `MergeMultiaddresses` method and update nodes management

Introduce a `MergeMultiaddresses` method to `NodeData` to handle multiaddress management more efficiently. Update the oracle node logic to use this method for merging incoming multiaddresses instead of replacing them, and add a new `NewWorker` function to initialize workers, enhancing logging and error handling for multiaddress processing.

* Switch to DHT for peer address lookup

Replaced the multiaddress-based peer information retrieval with a DHT lookup to simplify the process. Removed unnecessary imports and optimized the code for finding and connecting to peers in the Distributed Hash Table (DHT).

* Improve Twitter API rate limit error handling and propagation

- Modified ScrapeTweetsByQuery to immediately return rate limit errors
- Updated TwitterQueryHandler to properly propagate scraper errors
- Adjusted handleWorkResponse to correctly handle and return errors to the client
- Ensured rate limit errors are logged and returned with appropriate HTTP status codes
- Improved error message clarity for better debugging and user feedback

This commit enhances the system's ability to detect, log, and respond to Twitter API rate limit errors, providing clearer feedback to both developers and end-users when such limits are encountered.

* chore: remove unused publishWorkRequest function

- Deleted the publishWorkRequest function from pkg/api/handlers_data.go
- This function was not being used in the current codebase
- Removing it simplifies the code and reduces maintenance overhead

* refactor: handleWorkResponse with functional programming concepts

- Decompose handleWorkResponse into smaller, focused functions
- Introduce higher-order function for error response handling
- Separate concerns for improved modularity and testability
- Reduce mutable state and side effects where possible
- Maintain idiomatic Go while incorporating functional principles
- Improve error handling granularity and response structure

---------

Co-authored-by: Bob Stevens <[email protected]>
  • Loading branch information
2 people authored and mudler committed Sep 17, 2024
1 parent 5543bfb commit 417b693
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 232 deletions.
8 changes: 5 additions & 3 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os/signal"
"syscall"

"github.com/multiformats/go-multiaddr"

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/workers"

Expand Down Expand Up @@ -134,10 +136,10 @@ func main() {
}()

// Get the multiaddress and IP address of the node
multiAddr := node.GetMultiAddrs().String() // Get the multiaddress
ipAddr := node.Host.Addrs()[0].String() // Get the IP address
multiAddr := node.GetMultiAddrs() // Get the multiaddress
ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr, ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)

<-ctx.Done()
}
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0
github.com/dgraph-io/badger v1.6.2
github.com/ethereum/go-ethereum v1.14.8
Expand Down Expand Up @@ -128,7 +127,6 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kennygrant/sanitize v1.2.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U=
github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c h1:++BhWlmSX+n8m3O4gPfy3S4PTZ0TMzH6nelerBLPUng=
github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c/go.mod h1:C7ma6h458jTWT65mXC58L1Q6hnEtr0unur8cMc0UEXM=
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
Expand Down Expand Up @@ -372,8 +370,6 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
Expand Down
119 changes: 59 additions & 60 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/masa-finance/masa-oracle/pkg/scrapers/discord"
"github.com/masa-finance/masa-oracle/pkg/scrapers/telegram"
"github.com/masa-finance/masa-oracle/pkg/workers"
"github.com/masa-finance/masa-oracle/pkg/workers/types"
data_types "github.com/masa-finance/masa-oracle/pkg/workers/types"
)

type LLMChat struct {
Expand Down Expand Up @@ -78,30 +78,6 @@ func SendWorkRequest(api *API, requestID string, workType data_types.WorkerType,
return nil
}

// SendWorkRequest sends a work request to the PubSubManager for processing by a worker.
// It marshals the request details into JSON and publishes it to the configured topic.
//
// Parameters:
// - api: The API instance containing the Node and PubSubManager.
// - requestID: A unique identifier for the request.
// - request: The type of work to be performed by the worker.
// - bodyBytes: The request body in byte slice format.
//
// Returns:
// - error: An error object if the request could not be published, otherwise nil.
func publishWorkRequest(api *API, requestID string, request data_types.WorkerType, bodyBytes []byte) error {
workRequest := map[string]string{
"request": string(request),
"request_id": requestID,
"body": string(bodyBytes),
}
jsn, err := json.Marshal(workRequest)
if err != nil {
return err
}
return api.Node.PubSubManager.Publish(config.TopicWithVersion(config.WorkerTopic), jsn)
}

// handleWorkResponse processes the response from a worker and sends it back to the client.
// It listens on the provided response channel for a response or a timeout signal.
// If a response is received within the timeout period, it unmarshals the JSON response and sends it back to the client.
Expand All @@ -110,49 +86,72 @@ func publishWorkRequest(api *API, requestID string, request data_types.WorkerTyp
// Parameters:
// - c: The gin.Context object, which provides the context for the HTTP request.
// - responseCh: A channel that receives the worker's response as a byte slice.
func handleWorkResponse(c *gin.Context, responseCh chan data_types.WorkResponse, wg *sync.WaitGroup) {
func handleWorkResponse(c *gin.Context, responseCh <-chan data_types.WorkResponse, wg *sync.WaitGroup) {
cfg, err := LoadConfig()
if err != nil {
logrus.Errorf("Failed to load API cfg: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"})
handleError(c, "Failed to load API cfg", err)
return
}

for {
select {
case response := <-responseCh:
if response.Error != "" {
c.JSON(http.StatusExpectationFailed, response)
wg.Done()
return
}
if data, ok := response.Data.(string); ok && IsBase64(data) {
decodedData, err := base64.StdEncoding.DecodeString(response.Data.(string))
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to decode base64 data"})
return
}
var jsonData map[string]interface{}
err = json.Unmarshal(decodedData, &jsonData)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to parse JSON data"})
return
}
response.Data = jsonData
}
response.WorkRequest = nil
c.JSON(http.StatusOK, response)
wg.Done()
return
case <-time.After(cfg.WorkerResponseTimeout):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"})
return
case <-c.Done():
return
}
select {
case response := <-responseCh:
handleResponse(c, response, wg)
case <-time.After(cfg.WorkerResponseTimeout):
handleTimeout(c)
case <-c.Done():
// Context cancelled, no action needed
}
}

func handleResponse(c *gin.Context, response data_types.WorkResponse, wg *sync.WaitGroup) {
defer wg.Done()

if response.Error != "" {
handleErrorResponse(c, response)
return
}

if response.Data == nil {
c.JSON(http.StatusNotFound, gin.H{
"error": "No data returned",
"workerPeerId": response.WorkerPeerId,
})
return
}

c.JSON(http.StatusOK, response)
}

func handleErrorResponse(c *gin.Context, response data_types.WorkResponse) {
logrus.Errorf("[+] Work error: %s", response.Error)

errorResponse := func(status int, message string) {
c.JSON(status, gin.H{
"error": message,
"details": response.Error,
"workerPeerId": response.WorkerPeerId,
})
}

switch {
case strings.Contains(response.Error, "Twitter API rate limit exceeded (429 error)"):
errorResponse(http.StatusTooManyRequests, "Twitter API rate limit exceeded")
case strings.Contains(response.Error, "no workers could process"):
errorResponse(http.StatusServiceUnavailable, "No available workers to process the request")
default:
errorResponse(http.StatusInternalServerError, "An error occurred while processing the request")
}
}

func handleError(c *gin.Context, message string, err error) {
logrus.Errorf("%s: %v", message, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"})
}

func handleTimeout(c *gin.Context) {
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"})
}

// GetLLMModelsHandler returns a gin.HandlerFunc that retrieves the available LLM models.
// It does not expect any request parameters.
// The handler returns a JSON response containing an array of supported LLM model names.
Expand Down
Loading

0 comments on commit 417b693

Please sign in to comment.