From 03f6b2cfcf507477c8550610412b7dbea5ab253c Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 17:24:36 -0700 Subject: [PATCH 1/8] hotfix: swagger bug fixes (#511) * fix: swagger documentation * chore: cleanup files * chore: swagger annotations * chore: update swagger --- docs/MASAPROTOCOL.md | 77 --- docs/docs.go | 1361 +++------------------------------------ docs/issues.md | 184 ------ pkg/api/routes.go | 24 +- pkg/db/operations.go | 9 +- pkg/scrapers/web/web.go | 52 +- 6 files changed, 133 insertions(+), 1574 deletions(-) delete mode 100644 docs/MASAPROTOCOL.md delete mode 100644 docs/issues.md diff --git a/docs/MASAPROTOCOL.md b/docs/MASAPROTOCOL.md deleted file mode 100644 index 92c0f291..00000000 --- a/docs/MASAPROTOCOL.md +++ /dev/null @@ -1,77 +0,0 @@ -# Masa: Decentralized Data Protocol - -## Introduction - -Masa is a groundbreaking solution in the web3 space, addressing the pressing need for a unified data layer that encapsulates a user's holistic behavior and identity data. In the decentralized realm, while the promise of privacy and control over one's data is paramount, the absence of a comprehensive data layer has led to fragmented experiences and inefficiencies. Masa Oracle aims to bridge this gap, introducing a privacy-first approach that intertwines identity, behaviorial, onchain, and social graph data in a unified and secure manner. - -## Problem Statement - -In the burgeoning era of decentralized applications and platforms, the lack of a unified data layer stands out as a significant challenge. This absence means that a user's complete behavioral and identity data remains scattered across various platforms, leading to: - -1. No user-level analytics for marketers, product managers, and founders. -2. Stagnant ecosystem growth due to a lack of data-driven insights. -3. Fragmented user experiences. -4. Challenges in user authentication and identity verification. -5. Potential breaches of privacy and data security. - -Moreover, while web3 promises unparalleled privacy and control, the absence of a consolidated data infrastructure often compromises these very tenets. Users find themselves navigating a labyrinth of platforms, each with its data storage and management mechanisms, leading to inefficiencies and vulnerabilities. - -## Masa's Data Sources - -The Masa Protocol leverages three core data sources, each meticulously architected to respect and enhance user privacy: - -1. **Offchain Behavioral Data**: Using a proprietary cookieless tracking mechanism, Masa captures nuanced behavioral data without resorting to conventional cookies. This unique tracking ensures that users' online behaviors are understood without infringing on their privacy. - -2. **User Permissioned Offchain Data**: Masa taps into a wealth of information from platforms like Discord, Twitter, and through processes like Identity Verification and Sanctions Checks. Expressed onchain using zkSBTs (Zero Knowledge Soulbound Tokens) tokenized data is composable with blockchain infrastructure. However, this data is only accessed with explicit user permission, ensuring a user-centric approach. - -3. **Onchain Data**: Masa delves into on-chain data sources to gain insights into asset ownership, historical balances, transactions, and DID credentials amongs other features. - -Leveraging its proprietary and privacy preserving cookieless tracking, Masa achieves a deeper resolution of a user in web3, transcending the limitations of mere address identifications. By associating all of a user's addresses through device sessions and creating a singular Masa Identity, a comprehensive behavioral view of a user is formed. This Masa Identity, with its relationships with device sessions and addresses, paints a complete picture of a user, spanning their on-chain and off-chain behaviors and interactions. This secret sauce for the first time is able to deterministically measure active users in web3. - -## Masa Protocol: The Solution - -The Masa Protocol, encompassing the Masa Oracle and other components, offers a robust solution to this challenge. It aims to provide a unified, secure, and privacy-preserving data layer that places the user at its core. - -- **Unified Data Layer**: The Masa Protocol consolidates a user's behavioral and identity data, ensuring that it remains accessible yet private, irrespective of its source (on-chain or off-chain). - -- **Privacy By Default**: Instead of retrofitting privacy as an afterthought, the Masa Protocol ensures that a user's data remains private by default. Only the user holds the key to their data universe, deciding what gets shared and with whom. - -- **Opt-In Data Movement**: While the default stance is privacy, users have the autonomy to opt into moving their data on-chain. This movement isn't just a mere transfer; it's executed in a privacy-preserving manner using Masa's zkSBT (Zero-Knowledge Soulbound Token). This ensures that even on-chain, the user's data remains private, visible only to entities the user explicitly trusts. - -- **Data Monetization**: Through opting in to sharing privacy preserved data the Masa protocol enables a decentralized behavioral and identity marketplace that rewards uses whenever their data is used by a third party. For example, training a AI agent model for serving customer support documentation at critical steps in a user journey through an app. - -## The Masa Oracle: Bridging Gaps - -Acting as the linchpin of the Masa Protocol, the Masa Oracle bridges the on-chain and off-chain worlds. It serves as a conduit, channeling a tokenized representation of a user's complete identity. This identity encapsulates all the relationships a user has, spanning accounts and data both on and off-chain. The Oracle's use of zkSBT ensures that this representation, while comprehensive, never compromises on privacy. The Masa Oracle enables querying and sharing of private and anonymous data with any third party in a fully decentralized architecture. - -## Node Incentivization through Masa Tokens - -### Earning Masa Tokens - -Masa Oracle rewards its participating nodes with its native token, Masa. These tokens are issued as an incentive for nodes to actively contribute and maintain the health of the decentralized data network. By staking and providing essential services, nodes ensure the integrity and availability of data, receiving Masa tokens as compensation. - -### Revenue from Data Requests - -Beyond the foundational rewards in Masa tokens, nodes have the opportunity to earn additional revenue by fulfilling data requests. As the digital age progresses, the demand for private and anonymous behavioral data is surging, especially from stakeholders such as smart contracts, developers, and data engineers aiming to train decentralized AI models. By catering to these data requests, nodes tap into a lucrative revenue stream. - -#### Revenue Conversion - -Revenue garnered from data requests in various native currencies, such as ETH, MATIC, USDC, or other native tokens, doesn't remain fragmented. Instead, Masa Oracle employs a DEX (Decentralized Exchange) to convert these earnings into Masa tokens. This not only simplifies the reward mechanism but also ensures that node operators consistently augment their Masa holdings. - -## Protocol Governance & Voting - -Masa Oracle isn't just about data distribution; it's also about community-driven growth and evolution. Node operators who stake Masa tokens gain the privilege to partake in protocol governance. This decentralized governance approach ensures that every staking member has a voice. From proposing enhancements to voting on pivotal protocol decisions, staking in the Masa Oracle ecosystem empowers node operators to sculpt the future of decentralized data protocols. - -## Use Cases - -1. **Behavioral Analytics**: Projects can tap into the power of behavioral data to understand their users better, tailor experiences, and drive growth. With Masa, this doesn't come at the cost of privacy. - -2. **Data-Driven Decentralized AI**: By providing a unified data layer, Masa fuels the next generation of AI models in the decentralized space. Data scientists can leverage this rich data, training AI models that are both powerful and privacy-preserving. - -3. **Governance and Community Building**: With the comprehensive view that Masa provides, platforms can foster stronger communities. They can understand user needs better, drive engagement, and even facilitate governance mechanisms that are truly representative of the community's desires. - -4. **Decentralized Identity Verification**: With Masa Oracle, platforms can seamlessly verify a user's identity without compromising on their privacy. From simple sign-ins to complex identity checks, Masa streamlines the process. - -## Conclusion - -TBD diff --git a/docs/docs.go b/docs/docs.go index 94d35e82..92b76626 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -48,1212 +48,6 @@ const docTemplate = `{ "name": "username", "in": "path", "required": true - "schemes": {{ marshal .Schemes }}, - "swagger": "2.0", - "info": { - "description": "{{escape .Description}}", - "title": "{{.Title}}", - "contact": { - "name": "Masa API Support", - "url": "https://masa.ai", - "email": "support@masa.ai" - }, - "license": { - "name": "MIT", - "url": "https://opensource.org/license/mit" - }, - "version": "{{.Version}}" - }, - "host": "{{.Host}}", - "basePath": "{{.BasePath}}", - "securityDefinitions": { - "Bearer": { - "type": "apiKey", - "name": "Authorization", - "in": "header" - } - }, - "security": [ - { - "Bearer": [] - } - ], - "paths": { - "/peers": { - "get": { - "description": "Retrieves a list of peers connected to the node", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Peers" - ], - "summary": "Get list of peers", - "responses": { - "200": { - "description": "List of peer IDs", - "schema": { - "type": "array", - "items": { - "type": "string" - } - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/peer/addresses": { - "get": { - "description": "Retrieves a list of peer addresses connected to the node", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Peers" - ], - "summary": "Get peer addresses", - "responses": { - "200": { - "description": "List of peer addresses", - "schema": { - "type": "array", - "items": { - "type": "string" - } - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/twitter/profile/{username}": { - "get": { - "description": "Retrieves tweets from a specific Twitter profile", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Twitter" - ], - "summary": "Search Twitter Profile", - "parameters": [ - { - "type": "string", - "description": "Twitter Username", - "name": "username", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "List of tweets from the profile", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Tweet" - } - } - }, - "400": { - "description": "Invalid username or error fetching tweets", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/twitter/followers/{username}": { - "get": { - "description": "Retrieves followers from a specific Twitter profile.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Twitter" - ], - "summary": "Search Followers by Twitter Username", - "parameters": [ - { - "type": "string", - "description": "Twitter Username", - "name": "username", - "in": "path", - "required": true - }, - { - "type": "integer", - "description": "Maximum number of users to return", - "name": "count", - "in": "query", - "required": false, - "default": 20 - } - ], - "responses": { - "200": { - "description": "Array of profiles a user has as followers", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Profile" - } - } - }, - "400": { - "description": "Invalid username or error fetching followers", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/data/twitter/tweets/recent": { - "post": { - "description": "Retrieves recent tweets based on query parameters", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Twitter" - ], - "summary": "Search recent tweets", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Search parameters", - "required": true, - "schema": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Search Query" - }, - "count": { - "type": "integer", - "description": "Number of tweets to return" - } - } - } - } - ], - "responses": { - "200": { - "description": "List of recent tweets", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Tweet" - } - } - }, - "400": { - "description": "Invalid query or error fetching tweets", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/twitter/tweets/trends": { - "get": { - "description": "Retrieves the latest Twitter trending topics", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Twitter" - ], - "summary": "Twitter Trends", - "responses": { - "200": { - "description": "List of trending topics", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Trend" - } - } - }, - "400": { - "description": "Error fetching Twitter trends", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/discord/profile/{userID}": { - "get": { - "description": "Retrieves a Discord user profile by user ID.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Discord" - ], - "summary": "Search Discord Profile", - "parameters": [ - { - "name": "userID", - "in": "path", - "description": "Discord User ID", - "required": true, - "type": "string" - } - ], - "responses": { - "200": { - "description": "Successfully retrieved Discord user profile", - "schema": { - "$ref": "#/definitions/UserProfile" - } - }, - "400": { - "description": "Invalid user ID or error fetching profile", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/discord/channels/{channelID}/messages": { - "get": { - "description": "Retrieves messages from a specified Discord channel.", - "tags": ["Discord"], - "summary": "Get messages from a Discord channel", - "parameters": [ - { - "name": "channelID", - "in": "path", - "description": "Discord Channel ID", - "required": true, - "type": "string" - }, - { - "name": "limit", - "in": "query", - "description": "The maximum number of messages to return", - "required": false, - "type": "integer", - "format": "int32" - }, - { - "name": "before", - "in": "query", - "description": "A message ID to return messages posted before this message", - "required": false, - "type": "string" - } - ], - "responses": { - "200": { - "description": "Successfully retrieved messages from the Discord channel", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/ChannelMessage" - } - } - }, - "400": { - "description": "Invalid channel ID or error fetching messages", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/discord/guilds/{guildID}/channels": { - "get": { - "description": "Retrieves channels from a specified Discord guild.", - "tags": ["Discord"], - "summary": "Get channels from a Discord guild", - "parameters": [ - { - "name": "guildID", - "in": "path", - "description": "Discord Guild ID", - "required": true, - "type": "string" - } - ], - "responses": { - "200": { - "description": "Successfully retrieved channels from the Discord guild", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/GuildChannel" - } - } - }, - "400": { - "description": "Invalid guild ID or error fetching channels", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/discord/user/guilds": { - "get": { - "description": "Retrieves guilds from a specified Discord user.", - "tags": ["Discord"], - "summary": "Get guilds from a Discord user", - "parameters": [ - ], - "responses": { - "200": { - "description": "Successfully retrieved guilds from the Discord user", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/UserGuild" - } - } - }, - "400": { - "description": "Invalid user ID or error fetching guilds", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/discord/guilds/all": { - "get": { - "description": "Retrieves all guilds that all the Discord workers are apart of.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Discord" - ], - "summary": "Get all guilds", - "responses": { - "200": { - "description": "Successfully retrieved all guilds for the Discord user", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Guild" - } - } - }, - "400": { - "description": "Error fetching guilds or invalid access token", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/telegram/channel/messages": { - "post": { - "description": "Retrieves messages from a specified Telegram channel.", - "tags": ["Telegram"], - "summary": "Get Telegram Channel Messages", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Request body", - "required": true, - "schema": { - "type": "object", - "properties": { - "username": { - "type": "string", - "description": "Telegram Username" - } - }, - "required": ["username"] - } - } - ], - "responses": { - "200": { - "description": "Successfully retrieved messages", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Message" - } - } - }, - "400": { - "description": "Invalid username or error fetching messages", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/web": { - "post": { - "description": "Retrieves data from the web", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Web" - ], - "summary": "Web Data", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Search parameters", - "required": true, - "schema": { - "type": "object", - "properties": { - "url": { - "type": "string", - "description": "Url" - }, - "depth": { - "type": "integer", - "description": "Number of pages to scrape" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully retrieved web data", - "schema": { - "$ref": "#/definitions/WebDataResponse" - } - }, - "400": { - "description": "Invalid query or error fetching web data", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/dht": { - "get": { - "description": "Retrieves data from the DHT (Distributed Hash Table)", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "DHT" - ], - "summary": "Get DHT Data", - "parameters": [ - { - "in": "query", - "name": "key", - "description": "Key to retrieve data for", - "required": true, - "type": "string" - } - ], - "responses": { - "200": { - "description": "Successfully retrieved data from DHT", - "schema": { - "$ref": "#/definitions/DHTResponse" - } - }, - "400": { - "description": "Error retrieving data from DHT", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - }, - "post": { - "description": "Adds data to the DHT (Distributed Hash Table)", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "DHT" - ], - "summary": "Post to DHT", - "parameters": [ - { - "description": "Data to store in DHT", - "name": "data", - "in": "body", - "required": true, - "schema": { - "type": "object", - "properties": { - "key": { - "type": "string" - }, - "value": { - "type": "string" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully added data to DHT", - "schema": { - "$ref": "#/definitions/SuccessResponse" - } - }, - "400": { - "description": "Error adding data to DHT", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/llm/models": { - "get": { - "description": "Retrieves the available LLM models", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "LLM" - ], - "summary": "Get LLM Models", - "responses": { - "200": { - "description": "Successfully retrieved LLM models", - "schema": { - "$ref": "#/definitions/LLMModelsResponse" - } - }, - "400": { - "description": "Error retrieving LLM models", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/chat": { - "post": { - "summary": "Chat with AI", - "description": "Initiates a chat session with an AI model.", - "consumes": ["application/json"], - "produces": ["application/json"], - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Chat request payload", - "required": true, - "schema": { - "type": "object", - "properties": { - "model": { - "type": "string", - "example": "llama3" - }, - "messages": { - "type": "array", - "items": { - "type": "object", - "properties": { - "role": { - "type": "string", - "example": "user" - }, - "content": { - "type": "string", - "example": "why is the sky blue?" - } - } - } - }, - "stream": { - "type": "boolean", - "example": false - } - }, - "required": ["model", "messages", "stream"] - } - } - ], - "responses": { - "200": { - "description": "Successfully received response from AI", - "schema": { - "$ref": "#/definitions/ChatResponse" - } - }, - "400": { - "description": "Error communicating with AI", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/node/data": { - "get": { - "description": "Retrieves data from the node", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Node" - ], - "summary": "Node Data", - "responses": { - "200": { - "description": "Successfully retrieved node data", - "schema": { - "$ref": "#/definitions/NodeDataResponse" - } - }, - "400": { - "description": "Error retrieving node data", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/node/data/{peerid}": { - "get": { - "description": "Retrieves data for a specific node identified by peer ID", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Node" - ], - "summary": "Get Node Data by Peer ID", - "parameters": [ - { - "type": "string", - "description": "Peer ID", - "name": "peerid", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "Successfully retrieved node data by peer ID", - "schema": { - "$ref": "#/definitions/NodeDataResponse" - } - }, - "400": { - "description": "Error retrieving node data by peer ID", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/sentiment/tweets": { - "post": { - "description": "Searches for tweets and analyzes their sentiment", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Sentiment" - ], - "summary": "Analyze Sentiment of Tweets", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Sentiment analysis request body", - "required": true, - "schema": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Search Query" - }, - "count": { - "type": "integer", - "description": "Number of tweets to analyze" - }, - "model": { - "type": "string", - "description": "Sentiment analysis model to use" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully analyzed sentiment of tweets", - "schema": { - "$ref": "#/definitions/SentimentAnalysisResponse" - } - }, - "400": { - "description": "Error analyzing sentiment of tweets", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/sentiment/telegram": { - "post": { - "description": "Searches for Telegram messages and analyzes their sentiment", - "tags": ["Sentiment"], - "summary": "Analyze Sentiment of Telegram Messages", - "consumes": ["application/json"], - "produces": ["application/json"], - "parameters": [ - { - "name": "query", - "in": "body", - "description": "Search Query", - "required": true, - "schema": { - "type": "object", - "properties": { - "query": { - "type": "string" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully analyzed sentiment of Telegram messages", - "schema": { - "$ref": "#/definitions/SentimentAnalysisResponse" - } - }, - "400": { - "description": "Error analyzing sentiment of Telegram messages", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/sentiment/discord": { - "post": { - "description": "Searches for Discord messages and analyzes their sentiment", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Sentiment" - ], - "summary": "Analyze Sentiment of Discord Messages", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Sentiment analysis request body for Discord messages", - "required": true, - "schema": { - "type": "object", - "properties": { - "channelID": { - "type": "string", - "description": "Discord Channel ID" - }, - "prompt": { - "type": "string", - "description": "Prompt to enter" - }, - "model": { - "type": "string", - "description": "Sentiment analysis model to use" - } - }, - "required": ["channelID", "model"] - } - } - ], - "responses": { - "200": { - "description": "Successfully analyzed sentiment of Discord messages", - "schema": { - "$ref": "#/definitions/SentimentAnalysisResponse" - } - }, - "400": { - "description": "Error analyzing sentiment of Discord messages", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/auth": { - "get": { - "description": "Retrieves the API key for the node", - "produces": [ - "application/json" - ], - "tags": [ - "Authentication" - ], - "summary": "Get Node API Key", - "responses": { - "200": { - "description": "Successfully retrieved API key", - "schema": { - "type": "object", - "additionalProperties": { - "type": "string" - } - } - }, - "500": { - "description": "Error generating API key", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/auth/telegram/start": { - "post": { - "description": "Initiates the authentication process with Telegram by sending a code to the provided phone number.", - "tags": ["Authentication"], - "summary": "Start Telegram Authentication", - "consumes": ["application/json"], - "produces": ["application/json"], - "parameters": [ - { - "name": "phone_number", - "in": "body", - "description": "Phone Number", - "required": true, - "schema": { - "type": "object", - "properties": { - "phone_number": { - "type": "string" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully sent authentication code", - "schema": { - "type": "object", - "additionalProperties": { - "type": "string" - } - } - }, - "400": { - "description": "Invalid request body", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - }, - "500": { - "description": "Failed to initialize Telegram client or to start authentication", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/auth/telegram/complete": { - "post": { - "description": "Completes the authentication process with Telegram using the code sent to the phone number.", - "tags": ["Authentication"], - "summary": "Complete Telegram Authentication", - "consumes": ["application/json"], - "produces": ["application/json"], - "parameters": [ - { - "name": "phone_number", - "in": "body", - "description": "Phone Number", - "required": true, - "schema": { - "type": "object", - "properties": { - "phone_number": { - "type": "string" - }, - "code": { - "type": "string" - }, - "phone_code_hash": { - "type": "string" - }, - "password": { - "type": "string", - "description": "Optional password for two-factor authentication" - } - }, - "required": ["phone_number", "code", "phone_code_hash"] - } - } - ], - "responses": { - "200": { - "description": "Successfully authenticated", - "schema": { - "type": "object", - "additionalProperties": { - "type": "string" - } - } - }, - "400": { - "description": "Invalid request body", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - }, - "401": { - "description": "Two-factor authentication is required", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - }, - "500": { - "description": "Failed to initialize Telegram client or to complete authentication", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - "/sentiment/web": { - "post": { - "description": "Searches for web content and analyzes its sentiment", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "Sentiment" - ], - "summary": "Analyze Sentiment of Web Content", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Sentiment analysis request body", - "required": true, - "schema": { - "type": "object", - "properties": { - "url": { - "type": "string", - "description": "URL of the web content" - }, - "depth": { - "type": "integer", - "description": "Depth of web crawling" - }, - "model": { - "type": "string", - "description": "Sentiment analysis model to use" - } - } - } - } - ], - "responses": { - "200": { - "description": "Successfully analyzed sentiment of web content", - "schema": { - "$ref": "#/definitions/SentimentAnalysisResponse" - } - }, - "400": { - "description": "Error analyzing sentiment of web content", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - } - } - }, - }, - "DHTResponse": { - "type": "object", - "properties": { - "key": { - "type": "string" - }, - "value": { - "type": "string" - } } ], "responses": { @@ -1325,88 +119,79 @@ const docTemplate = `{ } }, "/data/twitter/tweets/recent": { - "post": { - "description": "Retrieves recent tweets based on query parameters", - "consumes": ["application/json"], - "produces": ["application/json"], - "tags": ["Twitter"], - "summary": "Search recent tweets", - "parameters": [ - { - "in": "body", - "name": "body", - "description": "Search parameters", - "required": true, - "schema": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Search Query" - }, - "count": { - "type": "integer", - "description": "Number of tweets to return" - } - } - } - } - ], - "responses": { - "200": { - "description": "List of recent tweets", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Tweet" - } - } - }, - "400": { - "description": "Invalid query or error fetching tweets", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, - "/data/twitter/tweets/trends": { - "get": { - "description": "Retrieves the latest Twitter trending topics", - "consumes": ["application/json"], - "produces": ["application/json"], - "tags": ["Twitter"], - "summary": "Twitter Trends", - "responses": { - "200": { - "description": "List of trending topics", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/Trend" - } - } - }, - "400": { - "description": "Error fetching Twitter trends", - "schema": { - "$ref": "#/definitions/ErrorResponse" - } - } - }, - "security": [ - { - "Bearer": [] - } - ] - } - }, + "post": { + "description": "Retrieves recent tweets based on query parameters, supporting advanced search options", + "consumes": ["application/json"], + "produces": ["application/json"], + "tags": ["Twitter"], + "summary": "Search recent tweets", + "parameters": [ + { + "in": "body", + "name": "body", + "description": "Search parameters", + "required": true, + "schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search Query" + }, + "count": { + "type": "integer", + "description": "Number of tweets to return", + "default": 10 + } + }, + "example": { + "query": "#Bitcoin", + "count": 10 + } + }, + "examples": { + "hashtag": { + "summary": "Search by hashtag", + "value": {"query": "#MasaNode", "count": 10} + }, + "mention": { + "summary": "Search by mention", + "value": {"query": "@getmasafi", "count": 10} + }, + "fromUser": { + "summary": "Search tweets from a user", + "value": {"query": "from:getmasafi", "count": 10} + }, + "language": { + "summary": "Search tweets in a specific language", + "value": {"query": "Masa lang:en", "count": 10} + }, + "dateRange": { + "summary": "Search tweets within a date range", + "value": {"query": "Masa since:2021-01-01 until:2021-12-31", "count": 10} + } + } + } + ], + "responses": { + "200": { + "description": "List of recent tweets", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/Tweet" + } + } + }, + "400": { + "description": "Invalid query or error fetching tweets", + "schema": { + "$ref": "#/definitions/ErrorResponse" + } + } + } + } +}, "/data/discord/profile/{userID}": { "get": { "description": "Retrieves a Discord user profile by user ID.", diff --git a/docs/issues.md b/docs/issues.md deleted file mode 100644 index c0d91c49..00000000 --- a/docs/issues.md +++ /dev/null @@ -1,184 +0,0 @@ -# Issue #1: **Setting Up the Basic libp2p Node** - -**Description**: -Initialize a basic `libp2p` node with essential configurations and peer discovery. - -**Tasks**: - -1. Install `libp2p` modules using npm or yarn. -2. Generate the node's identity using `libp2p-crypto`. -3. Define a custom transport protocol stack: TCP for local communication and WebSocket for browser-based interactions. -4. Activate the Kad-DHT module for peer discovery and routing. - -**Technical Considerations**: - -- Ensure the system has adequate resources for DHT since it can be resource-intensive. -- Monitor the node’s CPU and memory usage when enabling multiple transports. - ---- - -## Issue #2: **DHT Integration for Node Registration** - -**Description**: -Integrate a Distributed Hash Table (DHT) for node registration and discovery. - -**Tasks**: - -1. Use `@libp2p/js-libp2p-kad-dht` for DHT integration. -2. Define a protobuf schema for node details to ensure structured data storage in DHT. -3. Design a key-naming scheme to prevent key collisions when multiple nodes register. -4. Implement TTL (time-to-live) for DHT records to manage stale entries. - -**Technical Considerations**: - -- Account for network latency when setting TTL values. -- Ensure the DHT is initialized only after the libp2p node is fully started to avoid initialization errors. - ---- - -### Issue #3: **Ethereum Smart Contract for Masa Token Staking** - -**Description**: -Design and deploy an Ethereum smart contract. - -**Tasks**: - -1. Use the Solidity language for contract development. -2. Integrate with the ERC-20 "masa" token contract to manage staking operations. -3. Utilize OpenZeppelin libraries to ensure security and best practices. -4. Test the contract using `truffle` or `hardhat` on Ethereum testnets like Rinkeby or Ropsten. - -**Technical Considerations**: - -- Plan for gas optimization in the smart contract functions. -- Implement secure randomization for any reward distribution to deter attackers. - ---- - -### Issue #4: **Integrate Node Validation with Masa Token Staking** - -**Description**: -Connect the `libp2p` node with Ethereum for staking operations. - -**Tasks**: - -1. Utilize `web3.js` or `ethers.js` for Ethereum integration. -2. Store the Ethereum RPC endpoint securely, consider using environment variables. -3. Periodically poll the smart contract to validate the node's staked balance. -4. Automate the process of updating the node's status in DHT based on staking status. - -**Technical Considerations**: - -- Ensure Ethereum RPC calls are efficient to avoid rate limits. -- Cache staking status locally to reduce unnecessary Ethereum network calls. - ---- - -### Issue #5: **Setup Webhook Data Receipt Mechanism** - -**Description**: -Set up the `libp2p` node to accept incoming webhook data. - -**Tasks**: - -1. Implement a RESTful HTTP server, consider using `Express.js`. -2. Use middleware like `body-parser` for JSON payload parsing. -3. Store incoming data in a queue (like RabbitMQ) for asynchronous processing. -4. Add error handlers for malformed payloads. - -**Technical Considerations**: - -- Implement rate limiting to prevent abuse. -- Ensure that the HTTP server is isolated from the main node process for security. - ---- - -### Issue #6: **Data Propagation Across Nodes using libp2p** - -**Description**: -Manage data propagation to other nodes. - -**Tasks**: - -1. Use `libp2p`'s Gossipsub or Floodsub for data propagation. -2. Serialize data using `protobuf` for efficient data transfer. -3. Implement a retry mechanism for failed data transfers. -4. Confirm data receipt with ACK messages. - -**Technical Considerations**: - -- Monitor bandwidth usage to prevent network congestion. -- Implement message compression to reduce data transfer sizes. - ---- - -### Issue #7: **Implement Data Persistence with DHT Database** - -**Description**: -Store webhook data persistently. - -**Tasks**: - -1. Install and configure `DHT Database`. -2. Define a data structure/schema with indexes for efficient querying. -3. Implement batch writes for improved performance. - -**Technical Considerations**: - -- Monitor disk I/O to ensure optimal performance. -- Encrypt sensitive data before storing. - ---- - -### Issue #8: **Ethereum Transaction Management** - -**Description**: -Manage Ethereum transactions. - -**Tasks**: - -1. Calculate optimal gas prices using services like `ETH Gas Station`. -2. Implement nonce management to avoid transaction collisions. -3. Use Ethereum events or logs for transaction status monitoring. -4. Implement a gas increment strategy for transaction resubmission. - -**Technical Considerations**: - -- Account for Ethereum network congestion when calculating gas prices. -- Store transaction hashes for future reference and auditing. - ---- - -### Issue #9: **Node Rewarding System** - -**Description**: -Facilitate node reward claims. - -**Tasks**: - -1. Monitor the Ethereum contract's events using web3.js or ethers.js event listeners. -2. Automate reward claims using smart contract function calls. -3. Implement a local ledger system to keep track of earned rewards. - -**Technical Considerations**: - -- Monitor for "out of gas" errors when claiming rewards. -- Sync the local ledger with the on-chain data periodically. - ---- - -### Issue #10: **Security Enhancements** - -**Description**: -Secure the system against threats. - -**Tasks**: - -1. Use `libp2p`'s SECIO or Noise for encrypted peer communication. -2. Implement rate limiting on all endpoints. -3. Regularly audit the Ethereum smart contract using tools like `Mythril` or `Slither`. - -**Technical Considerations**: - -- Monitor for any unusual node behavior or communication patterns. -- Update cryptographic libraries regularly to patch any vulnerabilities. diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 9bcf4160..0a9ed7aa 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -181,14 +181,30 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine { v1.GET("/data/twitter/profile/:username", API.SearchTweetsProfile()) // @Summary Search recent tweets - // @Description Retrieves recent tweets based on query parameters + // @Description Retrieves recent tweets based on query parameters, supporting advanced search options // @Tags Twitter - // @Accept json - // @Produce json - // @Param query string true "Search Query" + // @Accept json + // @Produce json + // @Param body body object true "Search Query" // @Success 200 {array} Tweet "List of recent tweets" // @Failure 400 {object} ErrorResponse "Invalid query or error fetching tweets" // @Router /data/twitter/tweets/recent [post] + // @Param body body object true "Search Query" SchemaExample({"query": "#MasaNode", "count": 10}) + // @Example hashtag {"query": "#MasaNode", "count": 10} + // @Example mention {"query": "@getmasafi", "count": 10} + // @Example fromUser {"query": "from:getmasafi", "count": 10} + // @Example toUser {"query": "to:getmasafi", "count": 10} + // @Example language {"query": "Masa lang:en", "count": 10} + // @Example dateRange {"query": "Masa since:2021-01-01 until:2021-12-31", "count": 10} + // @Example excludeRetweets {"query": "Masa -filter:retweets", "count": 10} + // @Example minLikes {"query": "Masa min_faves:100", "count": 10} + // @Example minRetweets {"query": "Masa min_retweets:50", "count": 10} + // @Example keywordExclusion {"query": "Masa -moon", "count": 10} + // @Example orOperator {"query": "Masa OR Oracle", "count": 10} + // @Example geoLocation {"query": "Masa geocode:37.781157,-122.398720,1mi", "count": 10} + // @Example urlInclusion {"query": "url:\"http://example.com\"", "count": 10} + // @Example questionFilter {"query": "Masa ?", "count": 10} + // @Example safeSearch {"query": "Masa filter:safe", "count": 10} v1.POST("/data/twitter/tweets/recent", API.SearchTweetsRecent()) // @Summary Twitter Trends diff --git a/pkg/db/operations.go b/pkg/db/operations.go index 1fe21167..ac59c389 100644 --- a/pkg/db/operations.go +++ b/pkg/db/operations.go @@ -105,9 +105,12 @@ func ReadData(node *masa.OracleNode, key string) ([]byte, error) { // SendToS3 sends a payload to an S3-compatible API. // -// @param {string} uid - The unique identifier for the payload. -// @param {map[string]string} payload - The payload to be sent, represented as a map of key-value pairs. -// @returns {error} - Returns an error if the operation fails, otherwise returns nil. +// Parameters: +// - uid: The unique identifier for the payload. +// - payload: The payload to be sent, represented as a map of key-value pairs. +// +// Returns: +// - error: Returns an error if the operation fails, otherwise returns nil. func SendToS3(uid string, payload map[string]string) error { apiURL := os.Getenv("API_URL") diff --git a/pkg/scrapers/web/web.go b/pkg/scrapers/web/web.go index 7fb1344b..07cd40ec 100644 --- a/pkg/scrapers/web/web.go +++ b/pkg/scrapers/web/web.go @@ -32,19 +32,29 @@ type CollectedData struct { // ScrapeWebDataForSentiment initiates the scraping process for the given list of URIs. // It returns a CollectedData struct containing the scraped sections from each URI, // and an error if any occurred during the scraping process. -// Usage: -// @param uri: string - url to scrape -// @param depth: int - depth of how many subpages to scrape -// @param model: string - model to use for sentiment analysis +// +// Parameters: +// - uris: []string - list of URLs to scrape +// - depth: int - depth of how many subpages to scrape +// - model: string - model to use for sentiment analysis +// +// Returns: +// - string: Scraped data +// - string: Sentiment analysis result +// - error: Any error that occurred during the process +// // Example: // // go func() { -// res, err := scraper.ScrapeWebDataForSentiment([]string{"https://en.wikipedia.org/wiki/Maize"}, 5) +// data, sentiment, err := ScrapeWebDataForSentiment([]string{"https://en.wikipedia.org/wiki/Maize"}, 5, "gpt-3.5-turbo") // if err != nil { -// logrus.Errorf("[-] Error collecting data: %s", err.Error()) -// return -// } -// logrus.Infof("%+v", res) +// logrus.WithError(err).Error("Failed to collect data") +// return +// } +// logrus.WithFields(logrus.Fields{ +// "data": data, +// "sentiment": sentiment, +// }).Info("Scraping and sentiment analysis completed") // }() func ScrapeWebDataForSentiment(uri []string, depth int, model string) (string, string, error) { var collectedData CollectedData @@ -141,18 +151,24 @@ func ScrapeWebDataForSentiment(uri []string, depth int, model string) (string, s // ScrapeWebData initiates the scraping process for the given list of URIs. // It returns a CollectedData struct containing the scraped sections from each URI, // and an error if any occurred during the scraping process. -// Usage: -// @param uri: string - url to scrape -// @param depth: int - depth of how many subpages to scrape -// Example: +// +// Parameters: +// - uri: []string - list of URLs to scrape +// - depth: int - depth of how many subpages to scrape +// +// Returns: +// - []byte - JSON representation of the collected data +// - error - any error that occurred during the scraping process +// +// Example usage: // // go func() { -// res, err := scraper.ScrapeWebDataForSentiment([]string{"https://en.wikipedia.org/wiki/Maize"}, 5) +// res, err := scraper.ScrapeWebData([]string{"https://en.wikipedia.org/wiki/Maize"}, 5) // if err != nil { -// logrus.Errorf("[-] Error collecting data: %s", err.Error()) -// return -// } -// logrus.Infof("%+v", res) +// logrus.WithError(err).Error("Error collecting data") +// return +// } +// logrus.WithField("result", string(res)).Info("Scraping completed") // }() func ScrapeWebData(uri []string, depth int) ([]byte, error) { // Set default depth to 1 if 0 is provided From 54e108d5ae695a7bedb1465fac84137fd596daf4 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 18:19:00 -0700 Subject: [PATCH 2/8] feat: add event tracking and analytics package - Implement EventTracker for in-memory event management - Create EventClient for sending events to external API - Add configuration options with sensible defaults - Provide convenience methods for common event types - Integrate logrus for comprehensive error handling and logging - Ensure thread-safety with mutex locks --- pkg/event/config.go | 33 +++++++++++ pkg/event/event.go | 111 +++++++++++++++++++++++++++++++++++++ pkg/event/event_client.go | 58 +++++++++++++++++++ pkg/event/event_library.go | 16 ++++++ 4 files changed, 218 insertions(+) create mode 100644 pkg/event/config.go create mode 100644 pkg/event/event.go create mode 100644 pkg/event/event_client.go create mode 100644 pkg/event/event_library.go diff --git a/pkg/event/config.go b/pkg/event/config.go new file mode 100644 index 00000000..de5bfa2a --- /dev/null +++ b/pkg/event/config.go @@ -0,0 +1,33 @@ +package event + +import "time" + +const ( + // APIVersion is the version of the analytics API + APIVersion = "v1" + + // DefaultBaseURL is the default URL for the external API + DefaultBaseURL = "https://api.example.com/analytics" + + // DefaultHTTPTimeout is the default timeout for HTTP requests + DefaultHTTPTimeout = 10 * time.Second + + // MaxEventsInMemory is the maximum number of events to keep in memory + MaxEventsInMemory = 1000 +) + +// Config holds the configuration for the analytics package +type Config struct { + BaseURL string + HTTPTimeout time.Duration + LogLevel string +} + +// DefaultConfig returns the default configuration +func DefaultConfig() *Config { + return &Config{ + BaseURL: DefaultBaseURL, + HTTPTimeout: DefaultHTTPTimeout, + LogLevel: "info", + } +} diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 00000000..7fd7f44c --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,111 @@ +package event + +import ( + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +type Event struct { + Name string + Timestamp time.Time + Data map[string]interface{} +} + +type EventTracker struct { + events []Event + mu sync.Mutex + logger *logrus.Logger + config *Config + apiClient *EventClient +} + +func NewEventTracker(config *Config) *EventTracker { + if config == nil { + config = DefaultConfig() + } + logger := logrus.New() + logger.SetLevel(logrus.InfoLevel) + if level, err := logrus.ParseLevel(config.LogLevel); err == nil { + logger.SetLevel(level) + } + return &EventTracker{ + events: make([]Event, 0), + logger: logger, + config: config, + apiClient: NewEventClient(config.BaseURL, logger, config.HTTPTimeout), + } +} + +func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) { + if a == nil { + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + event := Event{ + Name: name, + Timestamp: time.Now(), + Data: data, + } + + a.events = append(a.events, event) + a.logger.WithFields(logrus.Fields{ + "event_name": name, + "data": data, + }).Info("Event tracked") +} + +func (a *EventTracker) GetEvents() []Event { + if a == nil { + return nil + } + + a.mu.Lock() + defer a.mu.Unlock() + + return append([]Event{}, a.events...) +} + +func (a *EventTracker) ClearEvents() { + if a == nil { + return + } + + a.mu.Lock() + defer a.mu.Unlock() + + a.events = make([]Event, 0) + a.logger.Info("Events cleared") +} + +func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error { + if a == nil { + return fmt.Errorf("analytics is nil") + } + + a.mu.Lock() + defer a.mu.Unlock() + + event := Event{ + Name: name, + Timestamp: time.Now(), + Data: data, + } + + a.events = append(a.events, event) + a.logger.WithFields(logrus.Fields{ + "event_name": name, + "data": data, + }).Info("Event tracked") + + if client != nil { + return client.SendEvent(event) + } + + return nil +} diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go new file mode 100644 index 00000000..0bf1e9dd --- /dev/null +++ b/pkg/event/event_client.go @@ -0,0 +1,58 @@ +package event + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/sirupsen/logrus" +) + +type EventClient struct { + BaseURL string + HTTPClient *http.Client + Logger *logrus.Logger +} + +func NewEventClient(baseURL string, logger *logrus.Logger, timeout time.Duration) *EventClient { + return &EventClient{ + BaseURL: baseURL, + HTTPClient: &http.Client{Timeout: timeout}, + Logger: logger, + } +} + +func (c *EventClient) SendEvent(event Event) error { + if c == nil { + return fmt.Errorf("EventClient is nil") + } + + url := fmt.Sprintf("%s/%s/events", c.BaseURL, APIVersion) + payload, err := json.Marshal(event) + if err != nil { + c.Logger.WithError(err).Error("Failed to marshal event") + return err + } + + resp, err := c.HTTPClient.Post(url, "application/json", bytes.NewBuffer(payload)) + if err != nil { + c.Logger.WithError(err).Error("Failed to send event") + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + err = fmt.Errorf("event service returned non-OK status: %d", resp.StatusCode) + c.Logger.WithError(err).Error("Failed to send event") + return err + } + + c.Logger.WithFields(logrus.Fields{ + "event_name": event.Name, + "timestamp": event.Timestamp, + }).Info("Event sent") + + return nil +} diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go new file mode 100644 index 00000000..7f75cdb9 --- /dev/null +++ b/pkg/event/event_library.go @@ -0,0 +1,16 @@ +package event + +func (a *EventTracker) TrackUserLogin(userID string, client *EventClient) error { + return a.TrackAndSendEvent("user_login", map[string]interface{}{"user_id": userID}, client) +} + +func (a *EventTracker) TrackPageView(pageURL string, client *EventClient) error { + return a.TrackAndSendEvent("page_view", map[string]interface{}{"url": pageURL}, client) +} + +func (a *EventTracker) TrackPurchase(productID string, amount float64, client *EventClient) error { + return a.TrackAndSendEvent("purchase", map[string]interface{}{ + "product_id": productID, + "amount": amount, + }, client) +} From e4845277ec438dafd06ccfbad6db4a7922a7219b Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 18:19:50 -0700 Subject: [PATCH 3/8] chore: upadte event_library and add todos --- pkg/event/event_library.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go index 7f75cdb9..5b4f073d 100644 --- a/pkg/event/event_library.go +++ b/pkg/event/event_library.go @@ -1,14 +1,27 @@ package event +import "fmt" + +// TODO: Define our events properly. These are placeholders and should be replaced with actual event definitions. + func (a *EventTracker) TrackUserLogin(userID string, client *EventClient) error { + if a == nil { + return fmt.Errorf("EventTracker is nil") + } return a.TrackAndSendEvent("user_login", map[string]interface{}{"user_id": userID}, client) } func (a *EventTracker) TrackPageView(pageURL string, client *EventClient) error { + if a == nil { + return fmt.Errorf("EventTracker is nil") + } return a.TrackAndSendEvent("page_view", map[string]interface{}{"url": pageURL}, client) } func (a *EventTracker) TrackPurchase(productID string, amount float64, client *EventClient) error { + if a == nil { + return fmt.Errorf("EventTracker is nil") + } return a.TrackAndSendEvent("purchase", map[string]interface{}{ "product_id": productID, "amount": amount, From 5b9b27b85caa6ce1eaa7a0764cb33f8bc17b4ede Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 18:46:56 -0700 Subject: [PATCH 4/8] feat: event tracking package with library --- pkg/event/README.md | 44 +++++++++++++++++++ pkg/event/event_library.go | 86 +++++++++++++++++++++++++++++++------- 2 files changed, 116 insertions(+), 14 deletions(-) create mode 100644 pkg/event/README.md diff --git a/pkg/event/README.md b/pkg/event/README.md new file mode 100644 index 00000000..58bb7425 --- /dev/null +++ b/pkg/event/README.md @@ -0,0 +1,44 @@ +# Masa Protocol Event Tracking Package + +A Go package for tracking and sending analytics events. + +## Features + +- In-memory event storage +- Configurable event sending to external API +- Thread-safe operations +- Comprehensive logging with logrus +- Convenience methods for common event types + +## Usage + +```go +import "github.com/masa-finance/masa-oracle/pkg/event" + +// Create a new event tracker with default config +tracker := event.NewEventTracker(nil) + +// Track a custom event +tracker.TrackEvent("custom_event", map[string]interface{}{"key": "value"}) + +// Use convenience method to track and send a login event +client := event.NewEventClient("https://api.example.com", logger, 10*time.Second) +err := tracker.TrackUserLogin("user123", client) +if err != nil { + log.Fatal(err) +} + +// Retrieve all tracked events +events := tracker.GetEvents() + +// Clear all tracked events +tracker.ClearEvents() +``` + +## Contributing + +Contributions are welcome! Please submit a pull request or create an issue for any bugs or feature requests. + +## License + +This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go index 5b4f073d..30e86867 100644 --- a/pkg/event/event_library.go +++ b/pkg/event/event_library.go @@ -1,29 +1,87 @@ package event -import "fmt" +import ( + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" + "github.com/sirupsen/logrus" +) -// TODO: Define our events properly. These are placeholders and should be replaced with actual event definitions. - -func (a *EventTracker) TrackUserLogin(userID string, client *EventClient) error { +// TrackWorkDistribution records the distribution of work to a worker. +// +// Parameters: +// - "peer_id": String containing the peer ID +// - workType: The type of work being distributed (e.g., Twitter, Web, Discord) +// - remoteWorker: Boolean indicating if the work is sent to a remote worker (true) or executed locally (false) +// - client: The EventClient used to send the event +// +// Returns: +// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// +// The event will contain the following data: +// - "peer_id": String containing the peer ID +// - "work_type": The WorkerType as a string +// - "remote_worker": Boolean indicating if it's a remote worker +func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string, client *EventClient) error { if a == nil { - return fmt.Errorf("EventTracker is nil") + logrus.Error("EventTracker is nil") + return nil } - return a.TrackAndSendEvent("user_login", map[string]interface{}{"user_id": userID}, client) + return a.TrackAndSendEvent("work_distribution", map[string]interface{}{ + "peer_id": peerId, + "work_type": workType, + "remote_worker": remoteWorker, + }, client) } -func (a *EventTracker) TrackPageView(pageURL string, client *EventClient) error { +// TrackWorkCompletion records the completion of a work item. +// +// Parameters: +// - "peer_id": String containing the peer ID +// - workType: The type of work that was completed +// - success: Boolean indicating if the work was completed successfully +// - client: The EventClient used to send the event +// +// Returns: +// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// +// The event will contain the following data: +// - "peer_id": String containing the peer ID +// - "work_type": The WorkerType as a string +// - "success": Boolean indicating if the work was successful +func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string, client *EventClient) error { if a == nil { - return fmt.Errorf("EventTracker is nil") + logrus.Error("EventTracker is nil") + return nil } - return a.TrackAndSendEvent("page_view", map[string]interface{}{"url": pageURL}, client) + return a.TrackAndSendEvent("work_completion", map[string]interface{}{ + "peer_id": peerId, + "work_type": workType, + "success": success, + }, client) } -func (a *EventTracker) TrackPurchase(productID string, amount float64, client *EventClient) error { +// TrackWorkerFailure records a failure that occurred during work execution. +// +// Parameters: +// - "peer_id": String containing the peer ID +// - workType: The type of work that failed +// - errorMessage: A string describing the error that occurred +// - client: The EventClient used to send the event +// +// Returns: +// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// +// The event will contain the following data: +// - "peer_id": String containing the peer ID +// - "work_type": The WorkerType as a string +// - "error": String containing the error message +func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string, client *EventClient) error { if a == nil { - return fmt.Errorf("EventTracker is nil") + logrus.Error("EventTracker is nil") + return nil } - return a.TrackAndSendEvent("purchase", map[string]interface{}{ - "product_id": productID, - "amount": amount, + return a.TrackAndSendEvent("worker_failure", map[string]interface{}{ + "peer_id": peerId, + "work_type": workType, + "error": errorMessage, }, client) } From 5167fc634691be9f3da0e9b6e88e5b043b1624e2 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 19:08:32 -0700 Subject: [PATCH 5/8] chore: update readme for event tracking package ready for implementation --- pkg/event/README.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/pkg/event/README.md b/pkg/event/README.md index 58bb7425..30b9df96 100644 --- a/pkg/event/README.md +++ b/pkg/event/README.md @@ -35,6 +35,43 @@ events := tracker.GetEvents() tracker.ClearEvents() ``` +## Event Library + +The package provides a set of predefined events for common scenarios: + +### Work Distribution + +```go +func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string, client *EventClient) error +``` + +Tracks the distribution of work to a worker. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `remote_worker`: Boolean indicating if it's a remote worker + +### Work Completion + +```go +func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string, client *EventClient) error +``` + +Records the completion of a work item. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `success`: Boolean indicating if the work was successful + +### Worker Failure + +```go +func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string, client *EventClient) error +``` + +Records a failure that occurred during work execution. Event data includes: +- `peer_id`: String containing the peer ID +- `work_type`: The WorkerType as a string +- `error`: String containing the error message + ## Contributing Contributions are welcome! Please submit a pull request or create an issue for any bugs or feature requests. From 798cee845f71b84aa950fe0edd3be2bbbabe2589 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Sun, 18 Aug 2024 19:51:56 -0700 Subject: [PATCH 6/8] fix: timestamps in UTC --- pkg/event/event.go | 6 +++--- pkg/event/event_client.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 7fd7f44c..48a69de2 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -10,7 +10,7 @@ import ( type Event struct { Name string - Timestamp time.Time + Timestamp time.Time `json:"timestamp"` Data map[string]interface{} } @@ -49,7 +49,7 @@ func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) { event := Event{ Name: name, - Timestamp: time.Now(), + Timestamp: time.Now().UTC(), Data: data, } @@ -93,7 +93,7 @@ func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{ event := Event{ Name: name, - Timestamp: time.Now(), + Timestamp: time.Now().UTC(), Data: data, } diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go index 0bf1e9dd..a0e98206 100644 --- a/pkg/event/event_client.go +++ b/pkg/event/event_client.go @@ -51,7 +51,7 @@ func (c *EventClient) SendEvent(event Event) error { c.Logger.WithFields(logrus.Fields{ "event_name": event.Name, - "timestamp": event.Timestamp, + "timestamp": event.Timestamp.UTC(), }).Info("Event sent") return nil From da2292406995b41f1655537ba106d4d5da3ed446 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Mon, 19 Aug 2024 12:57:45 -0700 Subject: [PATCH 7/8] feat: Improve error handling in TrackAndSendEvent function - Add error check for nil EventClient - Log SendEvent errors - Return SendEvent errors to caller - Clarify nil EventTracker error message --- pkg/event/event.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 48a69de2..3efd8991 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -85,7 +85,7 @@ func (a *EventTracker) ClearEvents() { func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error { if a == nil { - return fmt.Errorf("analytics is nil") + return fmt.Errorf("event tracker is nil") } a.mu.Lock() @@ -103,9 +103,13 @@ func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{ "data": data, }).Info("Event tracked") - if client != nil { - return client.SendEvent(event) + if client == nil { + return fmt.Errorf("no client available") } - return nil + err := client.SendEvent(event) + if err != nil { + a.logger.WithError(err).Error("Failed to send event") + } + return err } From 235730e3aa496445e61656df8e76ddb3e10798f1 Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Tue, 20 Aug 2024 10:45:06 -0700 Subject: [PATCH 8/8] adding the event tracker handling to the work distribution. --- pkg/event/event.go | 32 +++--- pkg/event/event_client.go | 8 +- pkg/event/event_library.go | 192 +++++++++++++++++++++++++++------- pkg/workers/worker_manager.go | 37 +++++-- 4 files changed, 213 insertions(+), 56 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 3efd8991..19141750 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -8,6 +8,19 @@ import ( "github.com/sirupsen/logrus" ) +const ( + WorkCompletion = "work_completion" + WorkFailure = "worker_failure" + WorkDistribution = "work_distribution" + WorkExecutionStart = "work_execution_start" + WorkExecutionTimeout = "work_execution_timeout" + RemoteWorkerConnection = "remote_work_connection" + StreamCreation = "stream_creation" + WorkRequestSerialization = "work_request_serialized" + WorkResponseDeserialization = "work_response_serialized" + LocalWorkerFallback = "local_work_executed" +) + type Event struct { Name string Timestamp time.Time `json:"timestamp"` @@ -84,10 +97,6 @@ func (a *EventTracker) ClearEvents() { } func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error { - if a == nil { - return fmt.Errorf("event tracker is nil") - } - a.mu.Lock() defer a.mu.Unlock() @@ -103,13 +112,12 @@ func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{ "data": data, }).Info("Event tracked") - if client == nil { - return fmt.Errorf("no client available") - } - - err := client.SendEvent(event) - if err != nil { - a.logger.WithError(err).Error("Failed to send event") + if client != nil { + return client.SendEvent(event) + } else { + if a.apiClient != nil { + return a.apiClient.SendEvent(event) + } } - return err + return fmt.Errorf("no client available") } diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go index a0e98206..71aeb787 100644 --- a/pkg/event/event_client.go +++ b/pkg/event/event_client.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "time" @@ -41,7 +42,12 @@ func (c *EventClient) SendEvent(event Event) error { c.Logger.WithError(err).Error("Failed to send event") return err } - defer resp.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + + } + }(resp.Body) if resp.StatusCode != http.StatusOK { err = fmt.Errorf("event service returned non-OK status: %d", resp.StatusCode) diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go index 30e86867..b6664751 100644 --- a/pkg/event/event_library.go +++ b/pkg/event/event_library.go @@ -1,87 +1,205 @@ package event import ( - data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" + "time" + "github.com/sirupsen/logrus" + + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) // TrackWorkDistribution records the distribution of work to a worker. // // Parameters: -// - "peer_id": String containing the peer ID // - workType: The type of work being distributed (e.g., Twitter, Web, Discord) // - remoteWorker: Boolean indicating if the work is sent to a remote worker (true) or executed locally (false) -// - client: The EventClient used to send the event -// -// Returns: -// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// - peerId: String containing the peer ID // // The event will contain the following data: // - "peer_id": String containing the peer ID // - "work_type": The WorkerType as a string // - "remote_worker": Boolean indicating if it's a remote worker -func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string, client *EventClient) error { - if a == nil { - logrus.Error("EventTracker is nil") - return nil - } - return a.TrackAndSendEvent("work_distribution", map[string]interface{}{ +func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string) { + err := a.TrackAndSendEvent(WorkDistribution, map[string]interface{}{ "peer_id": peerId, "work_type": workType, "remote_worker": remoteWorker, - }, client) + }, nil) + if err != nil { + logrus.Errorf("error tracking work distribution event: %s", err) + } } // TrackWorkCompletion records the completion of a work item. // // Parameters: -// - "peer_id": String containing the peer ID // - workType: The type of work that was completed // - success: Boolean indicating if the work was completed successfully -// - client: The EventClient used to send the event -// -// Returns: -// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// - peerId: String containing the peer ID // // The event will contain the following data: // - "peer_id": String containing the peer ID // - "work_type": The WorkerType as a string // - "success": Boolean indicating if the work was successful -func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string, client *EventClient) error { - if a == nil { - logrus.Error("EventTracker is nil") - return nil - } - return a.TrackAndSendEvent("work_completion", map[string]interface{}{ +func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string) { + err := a.TrackAndSendEvent(WorkCompletion, map[string]interface{}{ "peer_id": peerId, "work_type": workType, "success": success, - }, client) + }, nil) + if err != nil { + logrus.Errorf("error tracking work completion event: %s", err) + } } // TrackWorkerFailure records a failure that occurred during work execution. // // Parameters: -// - "peer_id": String containing the peer ID // - workType: The type of work that failed // - errorMessage: A string describing the error that occurred -// - client: The EventClient used to send the event -// -// Returns: -// - error: nil if the event was successfully tracked and sent, otherwise an error describing what went wrong +// - peerId: String containing the peer ID // // The event will contain the following data: // - "peer_id": String containing the peer ID // - "work_type": The WorkerType as a string // - "error": String containing the error message -func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string, client *EventClient) error { - if a == nil { - logrus.Error("EventTracker is nil") - return nil - } - return a.TrackAndSendEvent("worker_failure", map[string]interface{}{ +func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string) { + err := a.TrackAndSendEvent(WorkFailure, map[string]interface{}{ "peer_id": peerId, "work_type": workType, "error": errorMessage, - }, client) + }, nil) + if err != nil { + logrus.Errorf("error tracking worker failure event: %s", err) + } +} + +// TrackWorkExecutionStart records the start of work execution. +// +// Parameters: +// - workType: The type of work being executed +// - remoteWorker: Boolean indicating if the work is executed by a remote worker (true) or locally (false) +// - peerId: String containing the peer ID +// +// The event will contain the following data: +// - "work_type": The WorkerType as a string +// - "remote_worker": Boolean indicating if it's a remote worker +// - "peer_id": String containing the peer ID +func (a *EventTracker) TrackWorkExecutionStart(workType data_types.WorkerType, remoteWorker bool, peerId string) { + err := a.TrackAndSendEvent(WorkExecutionStart, map[string]interface{}{ + "work_type": workType, + "remote_worker": remoteWorker, + "peer_id": peerId, + }, nil) + if err != nil { + logrus.Errorf("error tracking work execution start event: %s", err) + } +} + +// TrackWorkExecutionTimeout records when work execution times out. +// +// Parameters: +// - workType: The type of work that timed out +// - timeoutDuration: The duration of the timeout +// +// The event will contain the following data: +// - "work_type": The WorkerType as a string +// - "timeout_duration": The duration of the timeout +func (a *EventTracker) TrackWorkExecutionTimeout(workType data_types.WorkerType, timeoutDuration time.Duration) { + err := a.TrackAndSendEvent(WorkExecutionTimeout, map[string]interface{}{ + "work_type": workType, + "timeout_duration": timeoutDuration, + }, nil) + if err != nil { + logrus.Errorf("error tracking work execution timeout event: %s", err) + } +} + +// TrackRemoteWorkerConnection records when a connection is established with a remote worker. +// +// Parameters: +// - peerId: String containing the peer ID +// +// The event will contain the following data: +// - "peer_id": String containing the peer ID +func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) { + err := a.TrackAndSendEvent(RemoteWorkerConnection, map[string]interface{}{ + "peer_id": peerId, + }, nil) + if err != nil { + logrus.Errorf("error tracking remote worker connection event: %s", err) + } +} + +// TrackStreamCreation records when a new stream is created for communication with a remote worker. +// +// Parameters: +// - peerId: String containing the peer ID +// - protocol: The protocol used for the stream +// +// The event will contain the following data: +// - "peer_id": String containing the peer ID +// - "protocol": The protocol used for the stream +func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) { + err := a.TrackAndSendEvent(StreamCreation, map[string]interface{}{ + "peer_id": peerId, + "protocol": protocol, + }, nil) + if err != nil { + logrus.Errorf("error tracking stream creation event: %s", err) + } +} + +// TrackWorkRequestSerialization records when a work request is serialized for transmission. +// +// Parameters: +// - workType: The type of work being serialized +// - dataSize: The size of the serialized data +// +// The event will contain the following data: +// - "work_type": The WorkerType as a string +// - "data_size": The size of the serialized data +func (a *EventTracker) TrackWorkRequestSerialization(workType data_types.WorkerType, dataSize int) { + err := a.TrackAndSendEvent(WorkRequestSerialization, map[string]interface{}{ + "work_type": workType, + "data_size": dataSize, + }, nil) + if err != nil { + logrus.Errorf("error tracking work request serialization event: %s", err) + } +} + +// TrackWorkResponseDeserialization records when a work response is deserialized after reception. +// +// Parameters: +// - workType: The type of work being deserialized +// - success: Boolean indicating if the deserialization was successful +// +// The event will contain the following data: +// - "work_type": The WorkerType as a string +// - "success": Boolean indicating if the deserialization was successful +func (a *EventTracker) TrackWorkResponseDeserialization(workType data_types.WorkerType, success bool) { + err := a.TrackAndSendEvent(WorkResponseDeserialization, map[string]interface{}{ + "work_type": workType, + "success": success, + }, nil) + if err != nil { + logrus.Errorf("error tracking work response deserialization event: %s", err) + } +} + +// TrackLocalWorkerFallback records when the system falls back to using a local worker. +// +// Parameters: +// - reason: The reason for the fallback +// +// The event will contain the following data: +// - "reason": The reason for the fallback +func (a *EventTracker) TrackLocalWorkerFallback(reason string) { + err := a.TrackAndSendEvent(LocalWorkerFallback, map[string]interface{}{ + "reason": reason, + }, nil) + if err != nil { + logrus.Errorf("error tracking local worker fallback event: %s", err) + } } diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index d4966cc6..e50be20d 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -15,6 +15,7 @@ import ( masa "github.com/masa-finance/masa-oracle/pkg" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/event" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -27,7 +28,8 @@ var ( func GetWorkHandlerManager() *WorkHandlerManager { once.Do(func() { instance = &WorkHandlerManager{ - handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), + handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), + eventTracker: event.NewEventTracker(nil), } instance.setupHandlers() }) @@ -51,8 +53,9 @@ type WorkHandlerInfo struct { // WorkHandlerManager manages work handlers and tracks their execution metrics. type WorkHandlerManager struct { - handlers map[data_types.WorkerType]*WorkHandlerInfo - mu sync.RWMutex + handlers map[data_types.WorkerType]*WorkHandlerInfo + mu sync.RWMutex + eventTracker *event.EventTracker } func (whm *WorkHandlerManager) setupHandlers() { @@ -111,14 +114,24 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers) response = whm.sendWorkToWorker(node, worker, workRequest) if response.Error != "" { + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) logrus.Errorf("error sending work to worker: %s: %s", response.WorkerPeerId, response.Error) logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId) continue } + whm.eventTracker.TrackWorkCompletion(workRequest.WorkType, response.Error == "", worker.AddrInfo.ID.String()) return response } // Fallback to local execution if local worker is eligible if localWorker != nil { + var reason string + if len(remoteWorkers) > 0 { + reason = "all remote workers failed" + } else { + reason = "no remote workers available" + } + whm.eventTracker.TrackLocalWorkerFallback(reason) + whm.eventTracker.TrackWorkExecutionStart(workRequest.WorkType, false, localWorker.AddrInfo.ID.String()) return whm.ExecuteWork(workRequest) } if response.Error == "" { @@ -135,21 +148,26 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da if err := node.Host.Connect(ctxWithTimeout, *worker.AddrInfo); err != nil { response.Error = fmt.Sprintf("failed to connect to remote peer %s: %v", worker.AddrInfo.ID.String(), err) + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) return } else { + whm.eventTracker.TrackRemoteWorkerConnection(worker.AddrInfo.ID.String()) logrus.Debugf("[+] Connection established with node: %s", worker.AddrInfo.ID.String()) - stream, err := node.Host.NewStream(ctxWithTimeout, worker.AddrInfo.ID, config.ProtocolWithVersion(config.WorkerProtocol)) + protocol := config.ProtocolWithVersion(config.WorkerProtocol) + stream, err := node.Host.NewStream(ctxWithTimeout, worker.AddrInfo.ID, protocol) if err != nil { response.Error = fmt.Sprintf("error opening stream: %v", err) + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) return } // the stream should be closed by the receiver, but keeping this here just in case + whm.eventTracker.TrackStreamCreation(worker.AddrInfo.ID.String(), string(protocol)) defer func(stream network.Stream) { err := stream.Close() if err != nil { logrus.Debugf("[-] Error closing stream: %s", err) } - }(stream) // Close the stream when done + }(stream) // Close the stream when done.S // Write the request to the stream with length prefix bytes, err := json.Marshal(workRequest) @@ -164,17 +182,20 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da response.Error = fmt.Sprintf("error writing length to stream: %v", err) return } + whm.eventTracker.TrackWorkRequestSerialization(workRequest.WorkType, len(bytes)) _, err = stream.Write(bytes) if err != nil { response.Error = fmt.Sprintf("error writing to stream: %v", err) + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) return } - + whm.eventTracker.TrackWorkDistribution(workRequest.WorkType, true, worker.AddrInfo.ID.String()) // Read the response length lengthBuf = make([]byte, 4) _, err = io.ReadFull(stream, lengthBuf) if err != nil { response.Error = fmt.Sprintf("error reading response length: %v", err) + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) return } responseLength := binary.BigEndian.Uint32(lengthBuf) @@ -191,6 +212,7 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da response.Error = fmt.Sprintf("error unmarshaling response: %v", err) return } + whm.eventTracker.TrackWorkResponseDeserialization(workRequest.WorkType, true) } return response } @@ -228,6 +250,8 @@ func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) ( select { case <-ctx.Done(): // Context timed out + whm.eventTracker.TrackWorkExecutionTimeout(workRequest.WorkType, workerConfig.WorkerResponseTimeout) + return data_types.WorkResponse{Error: "work execution timed out"} case response = <-responseChan: // Work completed within the timeout @@ -267,6 +291,7 @@ func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream) { return } peerId := stream.Conn().LocalPeer().String() + whm.eventTracker.TrackWorkExecutionStart(workRequest.WorkType, true, peerId) workResponse := whm.ExecuteWork(workRequest) if workResponse.Error != "" { logrus.Errorf("error from remote worker %s: executing work: %s", peerId, workResponse.Error)