Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event tracking for work distribution #525

Merged
merged 25 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
54e108d
feat: add event tracking and analytics package
teslashibe Aug 19, 2024
e484527
chore: upadte event_library and add todos
teslashibe Aug 19, 2024
5b9b27b
feat: event tracking package with library
teslashibe Aug 19, 2024
5167fc6
chore: update readme for event tracking package ready for implementation
teslashibe Aug 19, 2024
798cee8
fix: timestamps in UTC
teslashibe Aug 19, 2024
da22924
feat: Improve error handling in TrackAndSendEvent function
teslashibe Aug 19, 2024
235730e
adding the event tracker handling to the work distribution.
restevens402 Aug 20, 2024
008c9cb
chore: add API local base URL and tidy
teslashibe Aug 22, 2024
9ba7235
update go.mod
restevens402 Aug 23, 2024
86f3546
fix go.mod unused import
restevens402 Aug 23, 2024
5b794e7
Squashed commit of the following:
restevens402 Aug 23, 2024
15e3e5f
Add AddrInfo to localWorker in worker_selection.go
restevens402 Aug 23, 2024
fbbd1c6
Refactor event tracking to include peer ID and other details
restevens402 Aug 23, 2024
e892096
Merge branch 'main' into feat-event-tracking-for-work-distribution
restevens402 Aug 23, 2024
333951c
feat : Improve event tracking with TrackWorkRequest with Host peer ID…
teslashibe Aug 24, 2024
68896ef
feat (event): Add DataSource and Payload to event tracking
teslashibe Aug 26, 2024
e0440b9
refactor: Move event timestamp handling to server side
teslashibe Aug 26, 2024
5825d5c
Merge branch 'main' into feat-event-tracking-for-work-distribution
restevens402 Sep 3, 2024
b6df8e4
feat(api): enhance work request tracking for SearchTweetsRecent
teslashibe Sep 4, 2024
0ed400b
feat(api): remove Twitter trends endpoint and handler
teslashibe Sep 4, 2024
43379e6
refactor(api): remove sentiment analysis and LLM model endpoints
teslashibe Sep 4, 2024
4dd6589
feat(api): add event tracking for Twitter-related endpoints
teslashibe Sep 4, 2024
2b08943
Streamlining stream closures in OracleNode methods
restevens402 Sep 4, 2024
d1e5d61
Update event_library.go
teslashibe Sep 5, 2024
be61ffe
Restore event tracking for worker failures and task completion
restevens402 Sep 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0
github.com/dgraph-io/badger v1.6.2
github.com/ethereum/go-ethereum v1.14.8
Expand All @@ -24,6 +23,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.9
github.com/libp2p/go-libp2p v0.34.0
github.com/libp2p/go-libp2p-core v0.20.1
restevens402 marked this conversation as resolved.
Show resolved Hide resolved
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/libp2p/go-libp2p-pubsub v0.11.0
github.com/masa-finance/masa-twitter-scraper v0.0.0-20240515201201-b83fa3597a31
Expand Down Expand Up @@ -60,6 +60,7 @@ require (
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chyeh/pubip v0.0.0-20170203095919-b7e679cf541c // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ github.com/libp2p/go-libp2p v0.34.0 h1:J+SL3DMz+zPz06OHSRt42GKA5n5hmwgY1l7ckLUz3
github.com/libp2p/go-libp2p v0.34.0/go.mod h1:snyJQix4ET6Tj+LeI0VPjjxTtdWpeOhYt5lEY0KirkQ=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw=
github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY=
github.com/libp2p/go-libp2p-kad-dht v0.25.2 h1:FOIk9gHoe4YRWXTu8SY9Z1d0RILol0TrtApsMDPjAVQ=
github.com/libp2p/go-libp2p-kad-dht v0.25.2/go.mod h1:6za56ncRHYXX4Nc2vn8z7CZK0P4QiMcrn77acKLM2Oo=
github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0=
Expand Down
81 changes: 81 additions & 0 deletions pkg/event/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Masa Protocol Event Tracking Package

A Go package for tracking and sending analytics events.

## Features

- In-memory event storage
- Configurable event sending to external API
- Thread-safe operations
- Comprehensive logging with logrus
- Convenience methods for common event types

## Usage

```go
import "github.com/masa-finance/masa-oracle/pkg/event"

// Create a new event tracker with default config
tracker := event.NewEventTracker(nil)

// Track a custom event
tracker.TrackEvent("custom_event", map[string]interface{}{"key": "value"})

// Use convenience method to track and send a login event
client := event.NewEventClient("https://api.example.com", logger, 10*time.Second)
err := tracker.TrackUserLogin("user123", client)
if err != nil {
log.Fatal(err)
}

// Retrieve all tracked events
events := tracker.GetEvents()

// Clear all tracked events
tracker.ClearEvents()
```

## Event Library

The package provides a set of predefined events for common scenarios:

### Work Distribution

```go
func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string, client *EventClient) error
```

Tracks the distribution of work to a worker. Event data includes:
- `peer_id`: String containing the peer ID
- `work_type`: The WorkerType as a string
- `remote_worker`: Boolean indicating if it's a remote worker

### Work Completion

```go
func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string, client *EventClient) error
```

Records the completion of a work item. Event data includes:
- `peer_id`: String containing the peer ID
- `work_type`: The WorkerType as a string
- `success`: Boolean indicating if the work was successful

### Worker Failure

```go
func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string, client *EventClient) error
```

Records a failure that occurred during work execution. Event data includes:
- `peer_id`: String containing the peer ID
- `work_type`: The WorkerType as a string
- `error`: String containing the error message

## Contributing

Contributions are welcome! Please submit a pull request or create an issue for any bugs or feature requests.

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
33 changes: 33 additions & 0 deletions pkg/event/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package event

import "time"

const (
// APIVersion is the version of the analytics API
APIVersion = "v1"

// DefaultBaseURL is the default URL for the external API
DefaultBaseURL = "http://127.0.0.1:8081"

// DefaultHTTPTimeout is the default timeout for HTTP requests
DefaultHTTPTimeout = 10 * time.Second

// MaxEventsInMemory is the maximum number of events to keep in memory
MaxEventsInMemory = 1000
)

// Config holds the configuration for the analytics package
type Config struct {
BaseURL string
HTTPTimeout time.Duration
LogLevel string
}

// DefaultConfig returns the default configuration
func DefaultConfig() *Config {
return &Config{
BaseURL: DefaultBaseURL,
HTTPTimeout: DefaultHTTPTimeout,
LogLevel: "info",
}
}
123 changes: 123 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package event

import (
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
)

const (
WorkCompletion = "work_completion"
WorkFailure = "worker_failure"
WorkDistribution = "work_distribution"
WorkExecutionStart = "work_execution_start"
WorkExecutionTimeout = "work_execution_timeout"
RemoteWorkerConnection = "remote_work_connection"
StreamCreation = "stream_creation"
WorkRequestSerialization = "work_request_serialized"
WorkResponseDeserialization = "work_response_serialized"
LocalWorkerFallback = "local_work_executed"
)

type Event struct {
Name string
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{}
}

type EventTracker struct {
events []Event
mu sync.Mutex
logger *logrus.Logger
config *Config
apiClient *EventClient
}

func NewEventTracker(config *Config) *EventTracker {
if config == nil {
config = DefaultConfig()
}
logger := logrus.New()
logger.SetLevel(logrus.InfoLevel)
if level, err := logrus.ParseLevel(config.LogLevel); err == nil {
logger.SetLevel(level)
}
return &EventTracker{
events: make([]Event, 0),
logger: logger,
config: config,
apiClient: NewEventClient(config.BaseURL, logger, config.HTTPTimeout),
}
}

func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) {
if a == nil {
return
}

a.mu.Lock()
defer a.mu.Unlock()

event := Event{
Name: name,
Timestamp: time.Now().UTC(),
Data: data,
}

a.events = append(a.events, event)
a.logger.WithFields(logrus.Fields{
"event_name": name,
"data": data,
}).Info("Event tracked")
}

func (a *EventTracker) GetEvents() []Event {
if a == nil {
return nil
}

a.mu.Lock()
defer a.mu.Unlock()

return append([]Event{}, a.events...)
}

func (a *EventTracker) ClearEvents() {
if a == nil {
return
}

a.mu.Lock()
defer a.mu.Unlock()

a.events = make([]Event, 0)
a.logger.Info("Events cleared")
}

func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error {
a.mu.Lock()
defer a.mu.Unlock()

event := Event{
Name: name,
Timestamp: time.Now().UTC(),
Data: data,
}

a.events = append(a.events, event)
a.logger.WithFields(logrus.Fields{
"event_name": name,
"data": data,
}).Info("Event tracked")

if client != nil {
return client.SendEvent(event)
} else {
if a.apiClient != nil {
return a.apiClient.SendEvent(event)
}
}
return fmt.Errorf("no client available")
}
64 changes: 64 additions & 0 deletions pkg/event/event_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package event

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/sirupsen/logrus"
)

type EventClient struct {
BaseURL string
HTTPClient *http.Client
Logger *logrus.Logger
}

func NewEventClient(baseURL string, logger *logrus.Logger, timeout time.Duration) *EventClient {
return &EventClient{
BaseURL: baseURL,
HTTPClient: &http.Client{Timeout: timeout},
Logger: logger,
}
}

func (c *EventClient) SendEvent(event Event) error {
if c == nil {
return fmt.Errorf("EventClient is nil")
}

url := fmt.Sprintf("%s/%s/events", c.BaseURL, APIVersion)
payload, err := json.Marshal(event)
if err != nil {
c.Logger.WithError(err).Error("Failed to marshal event")
return err
}

resp, err := c.HTTPClient.Post(url, "application/json", bytes.NewBuffer(payload))
if err != nil {
c.Logger.WithError(err).Error("Failed to send event")
return err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {

}
}(resp.Body)

if resp.StatusCode != http.StatusOK {
err = fmt.Errorf("event service returned non-OK status: %d", resp.StatusCode)
c.Logger.WithError(err).Error("Failed to send event")
return err
}

c.Logger.WithFields(logrus.Fields{
"event_name": event.Name,
"timestamp": event.Timestamp.UTC(),
}).Info("Event sent")

return nil
}
Loading
Loading