Skip to content

Commit

Permalink
Refactor error handling and JSON tags in worker types
Browse files Browse the repository at this point in the history
Updated the error field in WorkResponse to a string as it should have been. Also, added JSON tags to struct fields for better JSON serialization. updated assignment of Error in handler functions to use formatted strings to conform to the data type change.
  • Loading branch information
restevens402 committed Aug 15, 2024
1 parent e618eec commit 840f32d
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 59 deletions.
22 changes: 18 additions & 4 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ func handleWorkResponse(c *gin.Context, responseCh chan data_types.WorkResponse,
for {
select {
case response := <-responseCh:
if response.Error != nil {
if response.Error != "" {
c.JSON(http.StatusExpectationFailed, response)
wg.Done()
return
}
if data, ok := response.Data.(string); ok && IsBase64(data) {
Expand All @@ -139,16 +140,14 @@ func handleWorkResponse(c *gin.Context, responseCh chan data_types.WorkResponse,
}
response.Data = jsonData
}
response.WorkRequest = data_types.WorkRequest{}
response.WorkRequest = &data_types.WorkRequest{}
c.JSON(http.StatusOK, response)
wg.Done()
return
case <-time.After(cfg.WorkerResponseTimeout):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out in API layer"})
wg.Done()
return
case <-c.Done():
wg.Done()
return
}
}
Expand Down Expand Up @@ -238,6 +237,7 @@ func (api *API) SearchTweetsAndAnalyzeSentiment() gin.HandlerFunc {
if wErr != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -290,6 +290,8 @@ func (api *API) SearchDiscordMessagesAndAnalyzeSentiment() gin.HandlerFunc {
c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()})
return
}
wg.Wait()

}
}

Expand Down Expand Up @@ -342,6 +344,7 @@ func (api *API) SearchTelegramMessagesAndAnalyzeSentiment() gin.HandlerFunc {
c.JSON(http.StatusBadRequest, gin.H{"error": wErr.Error()})
return
}
wg.Wait()
}
}

Expand Down Expand Up @@ -432,6 +435,7 @@ func (api *API) SearchTweetsProfile() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -468,6 +472,7 @@ func (api *API) SearchDiscordProfile() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -514,6 +519,7 @@ func (api *API) SearchChannelMessages() gin.HandlerFunc {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
wg.Wait()
}
}

Expand Down Expand Up @@ -546,6 +552,7 @@ func (api *API) SearchGuildChannels() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand All @@ -569,6 +576,7 @@ func (api *API) SearchUserGuilds() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -736,6 +744,7 @@ func (api *API) SearchTwitterFollowers() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -776,6 +785,7 @@ func (api *API) SearchTweetsRecent() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand All @@ -796,6 +806,7 @@ func (api *API) SearchTweetsTrends() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -844,6 +855,7 @@ func (api *API) WebData() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -928,6 +940,7 @@ func (api *API) GetChannelMessagesHandler() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down Expand Up @@ -983,6 +996,7 @@ func (api *API) LocalLlmChat() gin.HandlerFunc {
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}
wg.Wait()
}
}

Expand Down
34 changes: 24 additions & 10 deletions pkg/workers/handlers/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,56 +20,70 @@ func (h *DiscordProfileHandler) HandleWork(data []byte) data_types.WorkResponse
logrus.Infof("[+] DiscordProfileHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse discord json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse discord json data: %v", err)}
}
userID := dataMap["userID"].(string)
resp, err := discord.GetUserProfile(userID)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Data: resp, Error: fmt.Sprintf("unable to get discord user profile: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

// HandleWork implements the WorkHandler interface for DiscordChannelHandler.
func (h *DiscordChannelHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] DiscordChannelHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse discord json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse discord json data: %v", err)}
}
channelID := dataMap["channelID"].(string)
limit := dataMap["limit"].(string)
prompt := dataMap["prompt"].(string)
resp, err := discord.GetChannelMessages(channelID, limit, prompt)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get discord channel messages: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

// HandleWork implements the WorkHandler interface for DiscordSentimentHandler.
func (h *DiscordSentimentHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] DiscordSentimentHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse discord json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse discord json data: %v", err)}
}
channelID := dataMap["channelID"].(string)
model := dataMap["model"].(string)
prompt := dataMap["prompt"].(string)
_, resp, err := discord.ScrapeDiscordMessagesForSentiment(channelID, model, prompt)

return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get discord channel messages: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

// HandleWork implements the WorkHandler interface for DiscordGuildHandler.
func (h *DiscordGuildHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] DiscordGuildHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse discord json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse discord json data: %v", err)}
}
guildID := dataMap["guildID"].(string)
resp, err := discord.GetGuildChannels(guildID)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get discord guild channels: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

func (h *DiscoreUserGuildsHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] DiscordUserGuildsHandler %s", data)
resp, err := discord.GetUserGuilds()
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get discord user guilds: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}
12 changes: 7 additions & 5 deletions pkg/workers/handlers/llm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package handlers

import (
"encoding/json"
"errors"
"fmt"

"github.com/sirupsen/logrus"
Expand All @@ -29,18 +28,21 @@ func (h *LLMChatHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] LLM Chat %s", data)
uri := config.GetInstance().LLMChatUrl
if uri == "" {
return data_types.WorkResponse{Error: errors.New("missing env variable LLM_CHAT_URL")}
return data_types.WorkResponse{Error: "missing env variable LLM_CHAT_URL"}
}

var dataMap map[string]interface{}
if err := json.Unmarshal(data, &dataMap); err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse LLM chat data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse LLM chat data: %v", err)}
}

jsnBytes, err := json.Marshal(dataMap)
if err != nil {
return data_types.WorkResponse{Error: err}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to marshal LLM chat data: %v", err)}
}
resp, err := Post(uri, jsnBytes, nil)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to post LLM chat data: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}
14 changes: 10 additions & 4 deletions pkg/workers/handlers/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ func (h *TelegramSentimentHandler) HandleWork(data []byte) data_types.WorkRespon
logrus.Infof("[+] TelegramSentimentHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse telegram json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse telegram json data: %v", err)}
}
userName := dataMap["username"].(string)
model := dataMap["model"].(string)
prompt := dataMap["prompt"].(string)
_, resp, err := telegram.ScrapeTelegramMessagesForSentiment(context.Background(), userName, model, prompt)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get telegram sentiment: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

// HandleWork implements the WorkHandler interface for TelegramChannelHandler.
func (h *TelegramChannelHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TelegramChannelHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse telegram json data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse telegram json data: %v", err)}
}
userName := dataMap["username"].(string)
resp, err := telegram.FetchChannelMessages(context.Background(), userName)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get telegram channel messages: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}
33 changes: 24 additions & 9 deletions pkg/workers/handlers/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,67 @@ func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TwitterQueryHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse twitter query data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter query data: %v", err)}
}
count := int(dataMap["count"].(float64))
query := dataMap["query"].(string)
resp, err := twitter.ScrapeTweetsByQuery(query, count)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter query: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

func (h *TwitterFollowersHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TwitterFollowersHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse twitter followers data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter followers data: %v", err)}
}
username := dataMap["username"].(string)
count := int(dataMap["count"].(float64))
resp, err := twitter.ScrapeFollowersForProfile(username, count)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter followers: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

func (h *TwitterProfileHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TwitterProfileHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse twitter profile data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter profile data: %v", err)}
}
username := dataMap["username"].(string)
resp, err := twitter.ScrapeTweetsProfile(username)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter profile: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

func (h *TwitterSentimentHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TwitterSentimentHandler %s", data)
dataMap, err := JsonBytesToMap(data)
if err != nil {
return data_types.WorkResponse{Error: fmt.Errorf("unable to parse twitter sentiment data: %v", err)}
return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter sentiment data: %v", err)}
}
count := int(dataMap["count"].(float64))
query := dataMap["query"].(string)
model := dataMap["model"].(string)
_, resp, err := twitter.ScrapeTweetsForSentiment(query, count, model)
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter sentiment: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}

func (h *TwitterTrendsHandler) HandleWork(data []byte) data_types.WorkResponse {
logrus.Infof("[+] TwitterTrendsHandler %s", data)
resp, err := twitter.ScrapeTweetsByTrends()
return data_types.WorkResponse{Data: resp, Error: err}
if err != nil {
return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter trends: %v", err)}
}
return data_types.WorkResponse{Data: resp}
}
Loading

0 comments on commit 840f32d

Please sign in to comment.