diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index cd155353..b99516e3 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -23,6 +23,9 @@ import ( func main() { + logrus.SetLevel(logrus.DebugLevel) + logrus.Debug("Log level set to Debug") + if len(os.Args) > 1 && os.Args[1] == "--version" { logrus.Infof("Masa Oracle Node Version: %s\nMasa Oracle Protocol verison: %s", versioning.ApplicationVersion, versioning.ProtocolVersion) os.Exit(0) diff --git a/pkg/api/api.go b/pkg/api/api.go index 1274033d..98c7adb3 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -5,17 +5,33 @@ import ( "strconv" "github.com/gin-gonic/gin" + "github.com/sirupsen/logrus" masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/pkg/event" ) type API struct { - Node *masa.OracleNode + Node *masa.OracleNode + EventTracker *event.EventTracker } // NewAPI creates a new API instance with the given OracleNode. func NewAPI(node *masa.OracleNode) *API { - return &API{Node: node} + eventTracker := event.NewEventTracker(nil) + if eventTracker == nil { + logrus.Error("Failed to create EventTracker") + } else { + logrus.Debug("EventTracker created successfully") + } + + api := &API{ + Node: node, + EventTracker: eventTracker, + } + + logrus.Debugf("Created API instance with EventTracker: %v", api.EventTracker) + return api } // GetPathInt converts the path parameter with name to an int. diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index 49bbe225..5f702071 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -22,6 +22,7 @@ import ( "github.com/masa-finance/masa-oracle/pkg/chain" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/event" pubsub2 "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/scrapers/discord" "github.com/masa-finance/masa-oracle/pkg/scrapers/telegram" @@ -152,79 +153,39 @@ func handleTimeout(c *gin.Context) { c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"}) } -// GetLLMModelsHandler returns a gin.HandlerFunc that retrieves the available LLM models. -// It does not expect any request parameters. -// The handler returns a JSON response containing an array of supported LLM model names. -func (api *API) GetLLMModelsHandler() gin.HandlerFunc { - return func(c *gin.Context) { - models := []string{ - string(config.Models.ClaudeOpus), - string(config.Models.ClaudeSonnet), - string(config.Models.ClaudeHaiku), - string(config.Models.GPT4), - string(config.Models.GPT4o), - string(config.Models.GPT4Turbo), - string(config.Models.GPT35Turbo), - string(config.Models.LLama2), - string(config.Models.LLama3), - string(config.Models.Mistral), - string(config.Models.Gemma), - string(config.Models.Mixtral), - string(config.Models.OpenChat), - string(config.Models.NeuralChat), - string(config.Models.CloudflareQwen15Chat), - string(config.Models.CloudflareLlama27bChatFp16), - string(config.Models.CloudflareLlama38bInstruct), - string(config.Models.CloudflareMistral7bInstruct), - string(config.Models.CloudflareMistral7bInstructV01), - string(config.Models.CloudflareOpenchat350106), - string(config.Models.CloudflareMicrosoftPhi2), - string(config.Models.HuggingFaceGoogleGemma7bIt), - string(config.Models.HuggingFaceNousresearchHermes2ProMistral7b), - string(config.Models.HuggingFaceTheblokeLlama213bChatAwq), - string(config.Models.HuggingFaceTheblokeNeuralChat7bV31Awq), - } - c.JSON(http.StatusOK, gin.H{"models": models}) - } -} - -// SearchTweetsAndAnalyzeSentiment method adjusted to match the pattern -// Models Supported: -// -// chose a model or use "all" -func (api *API) SearchTweetsAndAnalyzeSentiment() gin.HandlerFunc { +// SearchTweetsProfile returns a gin.HandlerFunc that processes a request to search for tweets from a specific user profile. +// It expects a URL parameter "username" representing the Twitter username to search for. +// The handler validates the username, ensuring it is provided. +// If the request is valid, it attempts to scrape the user's profile and tweets. +// On success, it returns the scraped profile information in a JSON response. On failure, it returns an appropriate error message and HTTP status code. +func (api *API) SearchTweetsProfile() gin.HandlerFunc { return func(c *gin.Context) { - - if !api.Node.IsStaked { - c.JSON(http.StatusBadRequest, gin.H{"error": "Node has not staked and cannot participate"}) - return - } var reqBody struct { - Query string `json:"query"` - Count int `json:"count"` - Model string `json:"model"` + Username string `json:"username"` } - if err := c.ShouldBindJSON(&reqBody); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) + if c.Param("username") == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Username must be provided and valid"}) return } + reqBody.Username = c.Param("username") - if reqBody.Query == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Query parameter is missing"}) - return - } - if reqBody.Count <= 0 { - reqBody.Count = 50 // Default count - } - if reqBody.Model == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Model parameter is missing. Available models are claude-3-opus-20240229, claude-3-sonnet-20240229, claude-3-haiku-20240307, gpt-4, gpt-4-turbo-preview, gpt-3.5-turbo"}) - return + // Track work request event + if api.EventTracker != nil && api.Node != nil { + peerID := api.Node.Host.ID().String() + payload, err := json.Marshal(reqBody) + if err != nil { + logrus.Errorf("Failed to marshal request body for event tracking: %v", err) + } else { + api.EventTracker.TrackWorkRequest("SearchTweetsProfile", peerID, string(payload), event.DataSourceTwitter) + } + } else { + logrus.Warn("EventTracker or Node is nil in SearchTweetsProfile") } // worker handler implementation - bodyBytes, wErr := json.Marshal(reqBody) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } requestID := uuid.New().String() responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) @@ -232,105 +193,47 @@ func (api *API) SearchTweetsAndAnalyzeSentiment() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - wErr = SendWorkRequest(api, requestID, data_types.TwitterSentiment, bodyBytes, wg) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) + err = SendWorkRequest(api, requestID, data_types.TwitterProfile, bodyBytes, wg) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } wg.Wait() } } -// SearchDiscordMessagesAndAnalyzeSentiment processes a request to search Discord messages and analyze sentiment. -func (api *API) SearchDiscordMessagesAndAnalyzeSentiment() gin.HandlerFunc { +// SearchTweetsRecent returns a gin.HandlerFunc that processes a request to search for tweets based on a query and count. +// It expects a JSON body with fields "query" (string) and "count" (int), representing the search query and the number of tweets to return, respectively. +// The handler validates the request body, ensuring the query is not empty and the count is positive. +// If the request is valid, it attempts to scrape tweets using the specified query and count. +// On success, it returns the scraped tweets in a JSON response. On failure, it returns an appropriate error message and HTTP status code. +func (api *API) SearchTweetsRecent() gin.HandlerFunc { return func(c *gin.Context) { - if !api.Node.IsStaked { - c.JSON(http.StatusBadRequest, gin.H{"error": "Node has not staked and cannot participate"}) - return - } - var reqBody struct { - ChannelID string `json:"channelID"` - Prompt string `json:"prompt"` - Model string `json:"model"` - } - if err := c.ShouldBindJSON(&reqBody); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) - return - } - - if reqBody.ChannelID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "ChannelID parameter is missing"}) - return - } - - if reqBody.Prompt == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "ChannelID parameter is missing"}) - return - } - - if reqBody.Model == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Model parameter is missing"}) - return - } - - bodyBytes, wErr := json.Marshal(reqBody) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) - return - } - requestID := uuid.New().String() - responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) - wg := &sync.WaitGroup{} - defer workers.GetResponseChannelMap().Delete(requestID) - go handleWorkResponse(c, responseCh, wg) - - wErr = SendWorkRequest(api, requestID, data_types.DiscordSentiment, bodyBytes, wg) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) - return - } - wg.Wait() - - } -} - -// SearchTelegramMessagesAndAnalyzeSentiment processes a request to search Telegram messages and analyze sentiment. -func (api *API) SearchTelegramMessagesAndAnalyzeSentiment() gin.HandlerFunc { - return func(c *gin.Context) { - if !api.Node.IsStaked { - c.JSON(http.StatusBadRequest, gin.H{"error": "Node has not staked and cannot participate"}) - return + Query string `json:"query"` + Count int `json:"count"` } - var reqBody struct { - Username string `json:"username"` // Telegram usernames are used instead of channel IDs - Prompt string `json:"prompt"` - Model string `json:"model"` - } if err := c.ShouldBindJSON(&reqBody); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) return } - if reqBody.Username == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Username parameter is missing"}) - return - } - - if reqBody.Prompt == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Prompt parameter is missing"}) + if reqBody.Query == "" || reqBody.Count <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "Query and count must be provided and valid"}) return } - if reqBody.Model == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Model parameter is missing"}) - return + if api.EventTracker != nil && api.Node != nil { + peerID := api.Node.Host.ID().String() + payload, _ := json.Marshal(reqBody) + api.EventTracker.TrackWorkRequest("SearchTweetsRecent", peerID, string(payload), event.DataSourceTwitter) + } else { + logrus.Warn("EventTracker or Node is nil in SearchTweetsRecent") } - bodyBytes, wErr := json.Marshal(reqBody) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) - return + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } requestID := uuid.New().String() responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) @@ -338,86 +241,48 @@ func (api *API) SearchTelegramMessagesAndAnalyzeSentiment() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - wErr = SendWorkRequest(api, requestID, data_types.TelegramSentiment, bodyBytes, wg) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) - return + err = SendWorkRequest(api, requestID, data_types.Twitter, bodyBytes, wg) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } wg.Wait() } } -// SearchWebAndAnalyzeSentiment returns a gin.HandlerFunc that processes web search requests and performs sentiment analysis. -// It first validates the request body for required fields such as URL, Depth, and Model. If the Model is set to "all", -// it iterates through all available models to perform sentiment analysis on the web content fetched from the specified URL. -// The function responds with the sentiment analysis results in JSON format.// Models Supported: -// Models Supported: +// SearchTwitterFollowers returns a gin.HandlerFunc that retrieves the followers of a given Twitter user. // -// chose a model or use "all" -func (api *API) SearchWebAndAnalyzeSentiment() gin.HandlerFunc { +// Dev Notes: +// - This function uses URL parameters to get the username. +// - The default count is set to 20 if not provided. +func (api *API) SearchTwitterFollowers() gin.HandlerFunc { return func(c *gin.Context) { - - if !api.Node.IsStaked { - c.JSON(http.StatusBadRequest, gin.H{"error": "Node has not staked and cannot participate"}) - return - } var reqBody struct { - Url string `json:"url"` - Depth int `json:"depth"` - Model string `json:"model"` - } - if err := c.ShouldBindJSON(&reqBody); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) - return + Username string `json:"username"` + Count int `json:"count"` } - if reqBody.Url == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "URL parameter is missing"}) - return - } - if reqBody.Depth <= 0 { - reqBody.Depth = 1 // Default count - } - if reqBody.Model == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Model parameter is missing. Available models are claude-3-opus-20240229, claude-3-sonnet-20240229, claude-3-haiku-20240307, gpt-4, gpt-4-turbo-preview, gpt-3.5-turbo"}) + username := c.Param("username") + if username == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Username parameter is missing"}) return } - - // worker handler implementation - bodyBytes, wErr := json.Marshal(reqBody) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) - } - - requestID := uuid.New().String() - responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) - wg := &sync.WaitGroup{} - defer workers.GetResponseChannelMap().Delete(requestID) - go handleWorkResponse(c, responseCh, wg) - - wErr = SendWorkRequest(api, requestID, data_types.WebSentiment, bodyBytes, wg) - if wErr != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()}) + reqBody.Username = username + if reqBody.Count == 0 { + reqBody.Count = 20 } - wg.Wait() - } -} -// SearchTweetsProfile returns a gin.HandlerFunc that processes a request to search for tweets from a specific user profile. -// It expects a URL parameter "username" representing the Twitter username to search for. -// The handler validates the username, ensuring it is provided. -// If the request is valid, it attempts to scrape the user's profile and tweets. -// On success, it returns the scraped profile information in a JSON response. On failure, it returns an appropriate error message and HTTP status code. -func (api *API) SearchTweetsProfile() gin.HandlerFunc { - return func(c *gin.Context) { - var reqBody struct { - Username string `json:"username"` - } - if c.Param("username") == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Username must be provided and valid"}) - return + // Track work request event + if api.EventTracker != nil && api.Node != nil { + peerID := api.Node.Host.ID().String() + payload, err := json.Marshal(reqBody) + if err != nil { + logrus.Errorf("Failed to marshal request body for event tracking: %v", err) + } else { + api.EventTracker.TrackWorkRequest("SearchTwitterFollowers", peerID, string(payload), event.DataSourceTwitter) + } + } else { + logrus.Warn("EventTracker or Node is nil in SearchTwitterFollowers") } - reqBody.Username = c.Param("username") // worker handler implementation bodyBytes, err := json.Marshal(reqBody) @@ -430,7 +295,7 @@ func (api *API) SearchTweetsProfile() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.TwitterProfile, bodyBytes, wg) + err = SendWorkRequest(api, requestID, data_types.TwitterFollowers, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -691,124 +556,6 @@ func (api *API) SearchAllGuilds() gin.HandlerFunc { } } -// ExchangeDiscordTokenHandler returns a gin.HandlerFunc that exchanges a Discord OAuth2 authorization code for an access token. -//func (api *API) ExchangeDiscordTokenHandler() gin.HandlerFunc { -// return func(c *gin.Context) { -// code := c.Param("code") -// if code == "" { -// c.JSON(http.StatusBadRequest, gin.H{"error": "Authorization code must be provided"}) -// return -// } -// -// tokenResponse, err := discord.ExchangeCode(code) -// if err != nil { -// c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to exchange authorization code for access token", "details": err.Error()}) -// return -// } -// -// c.JSON(http.StatusOK, tokenResponse) -// } -//} - -// SearchTwitterFollowers returns a gin.HandlerFunc that retrieves the followers of a given Twitter user. -func (api *API) SearchTwitterFollowers() gin.HandlerFunc { - return func(c *gin.Context) { - var reqBody struct { - Username string `json:"username"` - Count int `json:"count"` - } - - username := c.Param("username") // Assuming you're using a URL parameter for the username - if username == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "Username parameter is missing"}) - return - } - reqBody.Username = username - if reqBody.Count == 0 { - reqBody.Count = 20 - } - - // worker handler implementation - bodyBytes, err := json.Marshal(reqBody) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - } - requestID := uuid.New().String() - responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) - wg := &sync.WaitGroup{} - defer workers.GetResponseChannelMap().Delete(requestID) - go handleWorkResponse(c, responseCh, wg) - - err = SendWorkRequest(api, requestID, data_types.TwitterFollowers, bodyBytes, wg) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - } - wg.Wait() - } -} - -// SearchTweetsRecent returns a gin.HandlerFunc that processes a request to search for tweets based on a query and count. -// It expects a JSON body with fields "query" (string) and "count" (int), representing the search query and the number of tweets to return, respectively. -// The handler validates the request body, ensuring the query is not empty and the count is positive. -// If the request is valid, it attempts to scrape tweets using the specified query and count. -// On success, it returns the scraped tweets in a JSON response. On failure, it returns an appropriate error message and HTTP status code. -func (api *API) SearchTweetsRecent() gin.HandlerFunc { - return func(c *gin.Context) { - var reqBody struct { - Query string `json:"query"` - Count int `json:"count"` - } - - if err := c.ShouldBindJSON(&reqBody); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request body"}) - return - } - - if reqBody.Query == "" || reqBody.Count <= 0 { - c.JSON(http.StatusBadRequest, gin.H{"error": "Query and count must be provided and valid"}) - return - } - - // worker handler implementation - bodyBytes, err := json.Marshal(reqBody) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - } - requestID := uuid.New().String() - responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) - wg := &sync.WaitGroup{} - defer workers.GetResponseChannelMap().Delete(requestID) - go handleWorkResponse(c, responseCh, wg) - - err = SendWorkRequest(api, requestID, data_types.Twitter, bodyBytes, wg) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - } - wg.Wait() - } -} - -// SearchTweetsTrends returns a gin.HandlerFunc that processes a request to search for trending tweets. -// It does not expect any request parameters. -// The handler attempts to scrape trending tweets using the ScrapeTweetsByTrends function. -// On success, it returns the scraped tweets in a JSON response. On failure, it returns an appropriate error message and HTTP status code. -func (api *API) SearchTweetsTrends() gin.HandlerFunc { - return func(c *gin.Context) { - // worker handler implementation - requestID := uuid.New().String() - responseCh := workers.GetResponseChannelMap().CreateChannel(requestID) - wg := &sync.WaitGroup{} - defer workers.GetResponseChannelMap().Delete(requestID) - go handleWorkResponse(c, responseCh, wg) - - err := SendWorkRequest(api, requestID, data_types.TwitterTrends, nil, wg) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - } - wg.Wait() - } -} - // WebData returns a gin.HandlerFunc that processes web scraping requests. // It expects a JSON body with fields "url" (string) and "depth" (int), representing the URL to scrape and the depth of the scrape, respectively. // The handler validates the request body, ensuring the URL is not empty and the depth is positive. diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 0a9ed7aa..df5dd8c7 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -207,16 +207,6 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine { // @Example safeSearch {"query": "Masa filter:safe", "count": 10} v1.POST("/data/twitter/tweets/recent", API.SearchTweetsRecent()) - // @Summary Twitter Trends - // @Description Retrieves the latest Twitter trending topics - // @Tags Twitter - // @Accept json - // @Produce json - // @Success 200 {array} Trend "List of trending topics" - // @Failure 400 {object} ErrorResponse "Error fetching Twitter trends" - // @Router /data/twitter/tweets/trends [get] - v1.GET("/data/twitter/tweets/trends", API.SearchTweetsTrends()) - // @Summary Search Discord Profile // @Description Retrieves a Discord user profile by user ID. // @Tags Discord @@ -343,16 +333,6 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine { // @Router /dht [post] v1.POST("/dht", API.PostToDHT()) - // @Summary Get LLM Models - // @Description Retrieves the available LLM models - // @Tags LLM - // @Accept json - // @Produce json - // @Success 200 {object} LLMModelsResponse "Successfully retrieved LLM models" - // @Failure 400 {object} ErrorResponse "Error retrieving LLM models" - // @Router /llm/models [get] - v1.GET("/llm/models", API.GetLLMModelsHandler()) - // @Summary Node Data // @Description Retrieves data from the node // @Tags Node @@ -406,50 +386,6 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine { // @Router /publickey/publish [post] v1.POST("/publickey/publish", API.PublishPublicKeyHandler()) - // @Summary Analyze Sentiment of Tweets - // @Description Searches for tweets and analyzes their sentiment - // @Tags Sentiment - // @Accept json - // @Produce json - // @Param query body string true "Search Query" - // @Success 200 {object} SentimentAnalysisResponse "Successfully analyzed sentiment of tweets" - // @Failure 400 {object} ErrorResponse "Error analyzing sentiment of tweets" - // @Router /sentiment/tweets [post] - v1.POST("/sentiment/tweets", API.SearchTweetsAndAnalyzeSentiment()) - - // @Summary Analyze Sentiment of Tweets - // @Description Searches for tweets and analyzes their sentiment - // @Tags Sentiment - // @Accept json - // @Produce json - // @Param query body string true "Prompt" - // @Success 200 {object} SentimentAnalysisResponse "Successfully analyzed sentiment of discord" - // @Failure 400 {object} ErrorResponse "Error analyzing sentiment of discord" - // @Router /sentiment/tweets [post] - v1.POST("/sentiment/discord", API.SearchDiscordMessagesAndAnalyzeSentiment()) - - // @Summary Analyze Sentiment of Telegram Messages - // @Description Searches for Telegram messages and analyzes their sentiment - // @Tags Sentiment - // @Accept json - // @Produce json - // @Param query body string true "Search Query" - // @Success 200 {object} SentimentAnalysisResponse "Successfully analyzed sentiment of Telegram messages" - // @Failure 400 {object} ErrorResponse "Error analyzing sentiment of Telegram messages" - // @Router /sentiment/telegram [post] - v1.POST("/sentiment/telegram", API.SearchTelegramMessagesAndAnalyzeSentiment()) - - // @Summary Analyze Sentiment of Web Content - // @Description Searches for web content and analyzes its sentiment - // @Tags Sentiment - // @Accept json - // @Produce json - // @Param query body string true "Search Query" - // @Success 200 {object} SentimentAnalysisResponse "Successfully analyzed sentiment of web content" - // @Failure 400 {object} ErrorResponse "Error analyzing sentiment of web content" - // @Router /sentiment/web [post] - v1.POST("/sentiment/web", API.SearchWebAndAnalyzeSentiment()) - // @Summary Create New Topic // @Description Creates a new discussion topic // @Tags Topics diff --git a/pkg/event/README.md b/pkg/event/README.md new file mode 100644 index 00000000..30b9df96 --- /dev/null +++ b/pkg/event/README.md @@ -0,0 +1,81 @@ +# Masa Protocol Event Tracking Package + +A Go package for tracking and sending analytics events. + +## Features + +- In-memory event storage +- Configurable event sending to external API +- Thread-safe operations +- Comprehensive logging with logrus +- Convenience methods for common event types + +## Usage + +```go +import "github.com/masa-finance/masa-oracle/pkg/event" + +// Create a new event tracker with default config +tracker := event.NewEventTracker(nil) + +// Track a custom event +tracker.TrackEvent("custom_event", map[string]interface{}{"key": "value"}) + +// Use convenience method to track and send a login event +client := event.NewEventClient("https://api.example.com", logger, 10*time.Second) +err := tracker.TrackUserLogin("user123", client) +if err != nil { + log.Fatal(err) +} + +// Retrieve all tracked events +events := tracker.GetEvents() + +// Clear all tracked events +tracker.ClearEvents() +``` + +## Event Library + +The package provides a set of predefined events for common scenarios: + +### Work Distribution + +```go +func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string, client *EventClient) error +``` + +Tracks the distribution of work to a worker. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `remote_worker`: Boolean indicating if it's a remote worker + +### Work Completion + +```go +func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string, client *EventClient) error +``` + +Records the completion of a work item. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `success`: Boolean indicating if the work was successful + +### Worker Failure + +```go +func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string, client *EventClient) error +``` + +Records a failure that occurred during work execution. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `error`: String containing the error message + +## Contributing + +Contributions are welcome! Please submit a pull request or create an issue for any bugs or feature requests. + +## License + +This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/pkg/event/config.go b/pkg/event/config.go new file mode 100644 index 00000000..cedb57f9 --- /dev/null +++ b/pkg/event/config.go @@ -0,0 +1,33 @@ +package event + +import "time" + +const ( + // APIVersion is the version of the analytics API + APIVersion = "v1" + + // DefaultBaseURL is the default URL for the external API + DefaultBaseURL = "http://127.0.0.1:8081" + + // DefaultHTTPTimeout is the default timeout for HTTP requests + DefaultHTTPTimeout = 10 * time.Second + + // MaxEventsInMemory is the maximum number of events to keep in memory + MaxEventsInMemory = 1000 +) + +// Config holds the configuration for the analytics package +type Config struct { + BaseURL string + HTTPTimeout time.Duration + LogLevel string +} + +// DefaultConfig returns the default configuration +func DefaultConfig() *Config { + return &Config{ + BaseURL: DefaultBaseURL, + HTTPTimeout: DefaultHTTPTimeout, + LogLevel: "info", + } +} diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 00000000..07c8c1d2 --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,138 @@ +package event + +import ( + "errors" + "fmt" + "sync" + + "github.com/sirupsen/logrus" +) + +const ( + WorkRequest = "work_request" + WorkCompletion = "work_completion" + WorkFailure = "worker_failure" + WorkDistribution = "work_distribution" + WorkExecutionStart = "work_execution_start" + WorkExecutionTimeout = "work_execution_timeout" + RemoteWorkerConnection = "remote_work_connection" + StreamCreation = "stream_creation" + WorkRequestSerialization = "work_request_serialized" + WorkResponseDeserialization = "work_response_serialized" + LocalWorkerFallback = "local_work_executed" + DataSourceTwitter = "twitter" + DataSourceDiscord = "discord" + DataSourceWeb = "web" + DataSourceTelegram = "telegram" +) + +type Event struct { + Name string `json:"name"` + PeerID string `json:"peer_id"` + Payload string `json:"payload"` + DataSource string `json:"data_source"` + WorkType string `json:"work_type"` + RemoteWorker bool `json:"remote_worker"` + Success bool `json:"success"` + Error string `json:"error"` +} + +type EventTracker struct { + events []Event + mu sync.Mutex + logger *logrus.Logger + config *Config + apiClient *EventClient +} + +func NewEventTracker(config *Config) *EventTracker { + if config == nil { + config = DefaultConfig() + } + logger := logrus.New() + logger.SetLevel(logrus.InfoLevel) + if level, err := logrus.ParseLevel(config.LogLevel); err == nil { + logger.SetLevel(level) + } + return &EventTracker{ + events: make([]Event, 0), + logger: logger, + config: config, + apiClient: NewEventClient(config.BaseURL, logger, config.HTTPTimeout), + } +} + +func (a *EventTracker) TrackEvent(event Event) { + if a == nil { + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + a.events = append(a.events, event) + a.logger.WithFields(logrus.Fields{ + "event_name": event.Name, + "data": event, + }).Info("Event tracked") +} + +func (a *EventTracker) GetEvents() []Event { + if a == nil { + return nil + } + + a.mu.Lock() + defer a.mu.Unlock() + + return append([]Event{}, a.events...) +} + +func (a *EventTracker) ClearEvents() { + if a == nil { + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + a.events = make([]Event, 0) + a.logger.Info("Events cleared") +} + +func (a *EventTracker) TrackAndSendEvent(event Event, client *EventClient) error { + a.mu.Lock() + defer a.mu.Unlock() + + a.events = append(a.events, event) + a.logger.WithFields(logrus.Fields{ + "event_name": event.Name, + "data": event, + }).Info("Event tracked") + + if client != nil { + return client.SendEvent(event) + } else { + if a.apiClient != nil { + err := validateEvent(event) + if err != nil { + return err + } + return a.apiClient.SendEvent(event) + } + } + return fmt.Errorf("no client available") +} + +func validateEvent(event Event) error { + if event.Name == "" { + return errors.New("Event name is required") + } + if event.PeerID == "" { + return errors.New("Peer ID is required") + } + if event.WorkType == "" { + return errors.New("Work type is required") + } + return nil +} diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go new file mode 100644 index 00000000..2d36cc23 --- /dev/null +++ b/pkg/event/event_client.go @@ -0,0 +1,63 @@ +package event + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/sirupsen/logrus" +) + +type EventClient struct { + BaseURL string + HTTPClient *http.Client + Logger *logrus.Logger +} + +func NewEventClient(baseURL string, logger *logrus.Logger, timeout time.Duration) *EventClient { + return &EventClient{ + BaseURL: baseURL, + HTTPClient: &http.Client{Timeout: timeout}, + Logger: logger, + } +} + +func (c *EventClient) SendEvent(event Event) error { + if c == nil { + return fmt.Errorf("EventClient is nil") + } + + url := fmt.Sprintf("%s/%s/events", c.BaseURL, APIVersion) + payload, err := json.Marshal(event) + if err != nil { + c.Logger.WithError(err).Error("Failed to marshal event") + return err + } + + resp, err := c.HTTPClient.Post(url, "application/json", bytes.NewBuffer(payload)) + if err != nil { + c.Logger.WithError(err).Error("Failed to send event") + return err + } + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + + } + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("event service returned non-OK status: %d", resp.StatusCode) + c.Logger.WithError(err).Error("Failed to send event") + return err + } + + c.Logger.WithFields(logrus.Fields{ + "event_name": event.Name, + }).Info("Event sent") + + return nil +} diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go new file mode 100644 index 00000000..c964937a --- /dev/null +++ b/pkg/event/event_library.go @@ -0,0 +1,213 @@ +package event + +import ( + "fmt" + "time" + + "github.com/sirupsen/logrus" +) + +// Add this function to the existing file + +// TrackWorkRequest records when a work request is initiated. +// +// Parameters: +// - workType: String indicating the type of work being requested (e.g., "SearchTweetsRecent") +// - peerId: String containing the peer ID (or client IP in this case) +func (a *EventTracker) TrackWorkRequest(workType string, peerId string, payload string, dataSource string) { + event := Event{ + Name: WorkRequest, + PeerID: peerId, + Payload: payload, + DataSource: dataSource, + WorkType: workType, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work request event: %s", err) + } + + logrus.Infof("[+] %s input: %s", workType, payload) +} + +// TrackWorkDistribution records the distribution of work to a worker. +// +// Parameters: +// - remoteWorker: Boolean indicating if the work is sent to a remote worker (true) or executed locally (false) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkDistribution(remoteWorker bool, peerId string) { + event := Event{ + Name: WorkDistribution, + PeerID: peerId, + WorkType: WorkDistribution, + RemoteWorker: remoteWorker, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work distribution event: %s", err) + } +} + +// TrackWorkCompletion records the completion of a work item. +// +// Parameters: +// - success: Boolean indicating if the work was completed successfully +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkCompletion(success bool, peerId string) { + event := Event{ + Name: WorkCompletion, + PeerID: peerId, + WorkType: WorkCompletion, + Success: success, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work completion event: %s", err) + } +} + +// TrackWorkerFailure records a failure that occurred during work execution. +// +// Parameters: +// - errorMessage: A string describing the error that occurred +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkerFailure(errorMessage string, peerId string) { + event := Event{ + Name: WorkFailure, + PeerID: peerId, + WorkType: WorkFailure, + Error: errorMessage, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking worker failure event: %s", err) + } +} + +// TODO: Do not implement the following for now we can focus only on the baove events + +// TrackWorkExecutionStart records the start of work execution. +// +// Parameters: +// - remoteWorker: Boolean indicating if the work is executed by a remote worker (true) or locally (false) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkExecutionStart(remoteWorker bool, peerId string) { + event := Event{ + Name: WorkExecutionStart, + PeerID: peerId, + WorkType: WorkExecutionStart, + RemoteWorker: remoteWorker, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work execution start event: %s", err) + } +} + +// TrackWorkExecutionTimeout records when work execution times out. +// +// Parameters: +// - timeoutDuration: The duration of the timeout +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkExecutionTimeout(timeoutDuration time.Duration, peerId string) { + event := Event{ + Name: WorkExecutionTimeout, + PeerID: peerId, + WorkType: WorkExecutionTimeout, + Error: fmt.Sprintf("timeout after %s", timeoutDuration), + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work execution timeout event: %s", err) + } +} + +// TrackRemoteWorkerConnection records when a connection is established with a remote worker. +// +// Parameters: +// - peerId: String containing the peer ID +func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) { + event := Event{ + Name: RemoteWorkerConnection, + PeerID: peerId, + WorkType: RemoteWorkerConnection, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking remote worker connection event: %s", err) + } +} + +// TrackStreamCreation records when a new stream is created for communication with a remote worker. +// +// Parameters: +// - peerId: String containing the peer ID +// - protocol: The protocol used for the stream +func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) { + event := Event{ + Name: StreamCreation, + PeerID: peerId, + WorkType: StreamCreation, + Error: protocol, // Assuming protocol is stored in Error field for now + + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking stream creation event: %s", err) + } +} + +// TrackWorkRequestSerialization records when a work request is serialized for transmission. +// +// Parameters: +// - dataSize: The size of the serialized data +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkRequestSerialization(dataSize int, peerId string) { + event := Event{ + Name: WorkRequestSerialization, + PeerID: peerId, + WorkType: WorkRequestSerialization, + Error: fmt.Sprintf("data size: %d", dataSize), // Assuming data size is stored in Error field for now + + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work request serialization event: %s", err) + } +} + +// TrackWorkResponseDeserialization records when a work response is deserialized after reception. +// +// Parameters: +// - success: Boolean indicating if the deserialization was successful +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkResponseDeserialization(success bool, peerId string) { + event := Event{ + Name: WorkResponseDeserialization, + PeerID: peerId, + WorkType: WorkResponseDeserialization, + Success: success, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking work response deserialization event: %s", err) + } +} + +// TrackLocalWorkerFallback records when the system falls back to using a local worker. +// +// Parameters: +// - reason: The reason for the fallback +// - peerId: String containing the peer ID +func (a *EventTracker) TrackLocalWorkerFallback(reason string, peerId string) { + event := Event{ + Name: LocalWorkerFallback, + PeerID: peerId, + WorkType: LocalWorkerFallback, + Error: reason, + } + err := a.TrackAndSendEvent(event, nil) + if err != nil { + logrus.Errorf("error tracking local worker fallback event: %s", err) + } +} diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 533cde97..a4cd379b 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -278,6 +278,13 @@ func (node *OracleNode) handleDiscoveredPeers() { // It reads the stream data, validates the remote peer ID, updates the node tracker // with the remote peer's information, and logs the event. func (node *OracleNode) handleStream(stream network.Stream) { + defer func(stream network.Stream) { + err := stream.Close() + if err != nil { + logrus.Infof("[-] Error closing stream: %v", err) + } + }(stream) + remotePeer, nodeData, err := node.handleStreamData(stream) if err != nil { if strings.HasPrefix(err.Error(), "un-staked") { diff --git a/pkg/oracle_node_listener.go b/pkg/oracle_node_listener.go index 228df490..8c026157 100644 --- a/pkg/oracle_node_listener.go +++ b/pkg/oracle_node_listener.go @@ -146,6 +146,7 @@ func (node *OracleNode) SendNodeData(peerID peer.ID) { logrus.Debugf("[-] Failed to close stream: %v", err) } }(stream) // Ensure the stream is closed after sending the data + logrus.Debugf("[+] Sending %d node data records to %s", totalRecords, peerID) for pageNumber := 0; pageNumber < totalPages; pageNumber++ { node.SendNodeDataPage(nodeData, stream, pageNumber) @@ -156,6 +157,7 @@ func (node *OracleNode) SendNodeData(peerID peer.ID) { // over a network stream. It scans the stream and unmarshals each // page of NodeData, refreshing the local NodeTracker with the data. func (node *OracleNode) ReceiveNodeData(stream network.Stream) { + // The stream is closed by the sender logrus.Debug("[+] ReceiveNodeData") scanner := bufio.NewScanner(stream) for scanner.Scan() { @@ -188,11 +190,9 @@ func (node *OracleNode) ReceiveNodeData(stream network.Stream) { } } } - node.NodeTracker.RefreshFromBoot(nd) } } - if err := scanner.Err(); err != nil { logrus.Errorf("[-] Failed to read stream: %v", err) } @@ -205,6 +205,13 @@ func (node *OracleNode) ReceiveNodeData(stream network.Stream) { // stream when finished. func (node *OracleNode) GossipNodeData(stream network.Stream) { logrus.Info("[+] GossipNodeData") + defer func(stream network.Stream) { + err := stream.Close() + if err != nil { + logrus.Debugf("[-] Failed to close stream: %v", err) + } + }(stream) // Ensure the stream is closed after sending the data + remotePeerId, nodeData, err := node.handleStreamData(stream) if err != nil { logrus.Errorf("[-] Failed to read stream: %v", err) @@ -214,10 +221,6 @@ func (node *OracleNode) GossipNodeData(stream network.Stream) { if remotePeerId.String() != nodeData.PeerId.String() { node.NodeTracker.HandleNodeData(nodeData) } - err = stream.Close() - if err != nil { - logrus.Errorf("[-] Failed to close stream: %v", err) - } } // handleStreamData reads a network stream to get the remote peer ID diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 56c8b49f..4c4827a6 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -16,6 +16,7 @@ import ( masa "github.com/masa-finance/masa-oracle/pkg" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/event" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" @@ -29,7 +30,8 @@ var ( func GetWorkHandlerManager() *WorkHandlerManager { once.Do(func() { instance = &WorkHandlerManager{ - handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), + handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), + eventTracker: event.NewEventTracker(nil), } instance.setupHandlers() }) @@ -53,8 +55,9 @@ type WorkHandlerInfo struct { // WorkHandlerManager manages work handlers and tracks their execution metrics. type WorkHandlerManager struct { - handlers map[data_types.WorkerType]*WorkHandlerInfo - mu sync.RWMutex + handlers map[data_types.WorkerType]*WorkHandlerInfo + mu sync.RWMutex + eventTracker *event.EventTracker } func (whm *WorkHandlerManager) setupHandlers() { @@ -132,9 +135,8 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers) response = whm.sendWorkToWorker(node, worker, workRequest) if response.Error != "" { - errorMsg := fmt.Sprintf("Worker %s: %s", worker.NodeData.PeerId, response.Error) - errors = append(errors, errorMsg) - logrus.Errorf("error sending work to worker: %s", errorMsg) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) + logrus.Errorf("error sending work to worker: %s: %s", response.WorkerPeerId, response.Error) logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId) // Check if the error is related to Twitter authentication @@ -149,7 +151,17 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest // Fallback to local execution if local worker is eligible and all remote workers failed if localWorker != nil { + var reason string + if len(remoteWorkers) > 0 { + reason = "all remote workers failed" + } else { + reason = "no remote workers available" + } + whm.eventTracker.TrackLocalWorkerFallback(reason, localWorker.AddrInfo.ID.String()) + response = whm.ExecuteWork(workRequest) + whm.eventTracker.TrackWorkCompletion(response.Error == "", localWorker.AddrInfo.ID.String()) + if response.Error != "" { errors = append(errors, fmt.Sprintf("Local worker: %s", response.Error)) } else { @@ -172,12 +184,15 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da if err := node.Host.Connect(ctxWithTimeout, *worker.AddrInfo); err != nil { response.Error = fmt.Sprintf("failed to connect to remote peer %s: %v", worker.AddrInfo.ID.String(), err) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } else { + //whm.eventTracker.TrackRemoteWorkerConnection(worker.AddrInfo.ID.String()) logrus.Debugf("[+] Connection established with node: %s", worker.AddrInfo.ID.String()) stream, err := node.Host.NewStream(ctxWithTimeout, worker.AddrInfo.ID, config.ProtocolWithVersion(config.WorkerProtocol)) if err != nil { response.Error = fmt.Sprintf("error opening stream: %v", err) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } // the stream should be closed by the receiver, but keeping this here just in case @@ -204,14 +219,16 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da _, err = stream.Write(bytes) if err != nil { response.Error = fmt.Sprintf("error writing to stream: %v", err) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } - + whm.eventTracker.TrackWorkDistribution(true, worker.AddrInfo.ID.String()) // Read the response length lengthBuf = make([]byte, 4) _, err = io.ReadFull(stream, lengthBuf) if err != nil { response.Error = fmt.Sprintf("error reading response length: %v", err) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } responseLength := binary.BigEndian.Uint32(lengthBuf) @@ -292,7 +309,7 @@ func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream) { defer func(stream network.Stream) { err := stream.Close() if err != nil { - logrus.Errorf("[-] Error closing stream in handler: %s", err) + logrus.Infof("[-] Error closing stream in handler: %s", err) } }(stream) @@ -325,6 +342,7 @@ func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream) { logrus.Errorf("error from remote worker %s: executing work: %s", peerId, workResponse.Error) } workResponse.WorkerPeerId = peerId + whm.eventTracker.TrackWorkCompletion(workResponse.Error == "", peerId) // Write the response to the stream responseBytes, err := json.Marshal(workResponse)