Skip to content

Commit

Permalink
Refactor event tracking to include peer ID and other details
Browse files Browse the repository at this point in the history
Updated event tracking functions to accept additional parameters such as peer ID. Simplified the event structure and ensured that all necessary information is captured and validated before sending events.
  • Loading branch information
restevens402 committed Aug 23, 2024
1 parent 15e3e5f commit fbbd1c6
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 138 deletions.
60 changes: 39 additions & 21 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package event

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -22,9 +23,13 @@ const (
)

type Event struct {
Name string
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{}
Name string `json:"name"`
Timestamp time.Time `json:"timestamp"`
PeerID string `json:"peer_id"`
WorkType string `json:"work_type"`
RemoteWorker bool `json:"remote_worker"`
Success bool `json:"success"`
Error string `json:"error"`
}

type EventTracker struct {
Expand Down Expand Up @@ -52,24 +57,19 @@ func NewEventTracker(config *Config) *EventTracker {
}
}

func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) {
func (a *EventTracker) TrackEvent(event Event) {
if a == nil {
return
}

a.mu.Lock()
defer a.mu.Unlock()

event := Event{
Name: name,
Timestamp: time.Now().UTC(),
Data: data,
}

event.Timestamp = time.Now().UTC()
a.events = append(a.events, event)
a.logger.WithFields(logrus.Fields{
"event_name": name,
"data": data,
"event_name": event.Name,
"data": event,
}).Info("Event tracked")
}

Expand All @@ -96,28 +96,46 @@ func (a *EventTracker) ClearEvents() {
a.logger.Info("Events cleared")
}

func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error {
func (a *EventTracker) TrackAndSendEvent(event Event, client *EventClient) error {
a.mu.Lock()
defer a.mu.Unlock()

event := Event{
Name: name,
Timestamp: time.Now().UTC(),
Data: data,
}

event.Timestamp = time.Now().UTC()
a.events = append(a.events, event)
a.logger.WithFields(logrus.Fields{
"event_name": name,
"data": data,
"event_name": event.Name,
"data": event,
}).Info("Event tracked")

if client != nil {
return client.SendEvent(event)
} else {
if a.apiClient != nil {
err := validateEvent(event)
if err != nil {
return err
}
return a.apiClient.SendEvent(event)
}
}
return fmt.Errorf("no client available")
}

func validateEvent(event Event) error {
if event.Name == "" {
return errors.New("Event name is required")
}
if event.Timestamp.IsZero() {
return errors.New("Invalid timestamp")
}
if event.Timestamp.After(time.Now().UTC()) {
return errors.New("Timestamp cannot be in the future")
}
if event.PeerID == "" {
return errors.New("Peer ID is required")
}
if event.WorkType == "" {
return errors.New("Work type is required")
}
return nil
}
193 changes: 92 additions & 101 deletions pkg/event/event_library.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
package event

import (
"fmt"
"time"

"github.com/sirupsen/logrus"

data_types "github.com/masa-finance/masa-oracle/pkg/workers/types"
)

// TrackWorkDistribution records the distribution of work to a worker.
//
// Parameters:
// - workType: The type of work being distributed (e.g., Twitter, Web, Discord)
// - remoteWorker: Boolean indicating if the work is sent to a remote worker (true) or executed locally (false)
// - peerId: String containing the peer ID
//
// The event will contain the following data:
// - "peer_id": String containing the peer ID
// - "work_type": The WorkerType as a string
// - "remote_worker": Boolean indicating if it's a remote worker
func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string) {
err := a.TrackAndSendEvent(WorkDistribution, map[string]interface{}{
"peer_id": peerId,
"work_type": workType,
"remote_worker": remoteWorker,
}, nil)
func (a *EventTracker) TrackWorkDistribution(remoteWorker bool, peerId string) {
event := Event{
Name: WorkDistribution,
PeerID: peerId,
WorkType: WorkDistribution,
RemoteWorker: remoteWorker,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work distribution event: %s", err)
}
Expand All @@ -33,20 +29,17 @@ func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, rem
// TrackWorkCompletion records the completion of a work item.
//
// Parameters:
// - workType: The type of work that was completed
// - success: Boolean indicating if the work was completed successfully
// - peerId: String containing the peer ID
//
// The event will contain the following data:
// - "peer_id": String containing the peer ID
// - "work_type": The WorkerType as a string
// - "success": Boolean indicating if the work was successful
func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string) {
err := a.TrackAndSendEvent(WorkCompletion, map[string]interface{}{
"peer_id": peerId,
"work_type": workType,
"success": success,
}, nil)
func (a *EventTracker) TrackWorkCompletion(success bool, peerId string) {
event := Event{
Name: WorkCompletion,
PeerID: peerId,
WorkType: WorkCompletion,
Success: success,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work completion event: %s", err)
}
Expand All @@ -55,20 +48,17 @@ func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, succe
// TrackWorkerFailure records a failure that occurred during work execution.
//
// Parameters:
// - workType: The type of work that failed
// - errorMessage: A string describing the error that occurred
// - peerId: String containing the peer ID
//
// The event will contain the following data:
// - "peer_id": String containing the peer ID
// - "work_type": The WorkerType as a string
// - "error": String containing the error message
func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string) {
err := a.TrackAndSendEvent(WorkFailure, map[string]interface{}{
"peer_id": peerId,
"work_type": workType,
"error": errorMessage,
}, nil)
func (a *EventTracker) TrackWorkerFailure(errorMessage string, peerId string) {
event := Event{
Name: WorkFailure,
PeerID: peerId,
WorkType: WorkFailure,
Error: errorMessage,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking worker failure event: %s", err)
}
Expand All @@ -77,20 +67,17 @@ func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorM
// TrackWorkExecutionStart records the start of work execution.
//
// Parameters:
// - workType: The type of work being executed
// - remoteWorker: Boolean indicating if the work is executed by a remote worker (true) or locally (false)
// - peerId: String containing the peer ID
//
// The event will contain the following data:
// - "work_type": The WorkerType as a string
// - "remote_worker": Boolean indicating if it's a remote worker
// - "peer_id": String containing the peer ID
func (a *EventTracker) TrackWorkExecutionStart(workType data_types.WorkerType, remoteWorker bool, peerId string) {
err := a.TrackAndSendEvent(WorkExecutionStart, map[string]interface{}{
"work_type": workType,
"remote_worker": remoteWorker,
"peer_id": peerId,
}, nil)
func (a *EventTracker) TrackWorkExecutionStart(remoteWorker bool, peerId string) {
event := Event{
Name: WorkExecutionStart,
PeerID: peerId,
WorkType: WorkExecutionStart,
RemoteWorker: remoteWorker,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work execution start event: %s", err)
}
Expand All @@ -99,17 +86,17 @@ func (a *EventTracker) TrackWorkExecutionStart(workType data_types.WorkerType, r
// TrackWorkExecutionTimeout records when work execution times out.
//
// Parameters:
// - workType: The type of work that timed out
// - timeoutDuration: The duration of the timeout
//
// The event will contain the following data:
// - "work_type": The WorkerType as a string
// - "timeout_duration": The duration of the timeout
func (a *EventTracker) TrackWorkExecutionTimeout(workType data_types.WorkerType, timeoutDuration time.Duration) {
err := a.TrackAndSendEvent(WorkExecutionTimeout, map[string]interface{}{
"work_type": workType,
"timeout_duration": timeoutDuration,
}, nil)
// - peerId: String containing the peer ID
func (a *EventTracker) TrackWorkExecutionTimeout(timeoutDuration time.Duration, peerId string) {
event := Event{
Name: WorkExecutionTimeout,
PeerID: peerId,
WorkType: WorkExecutionTimeout,
Error: fmt.Sprintf("timeout after %s", timeoutDuration),
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work execution timeout event: %s", err)
}
Expand All @@ -119,13 +106,14 @@ func (a *EventTracker) TrackWorkExecutionTimeout(workType data_types.WorkerType,
//
// Parameters:
// - peerId: String containing the peer ID
//
// The event will contain the following data:
// - "peer_id": String containing the peer ID
func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) {
err := a.TrackAndSendEvent(RemoteWorkerConnection, map[string]interface{}{
"peer_id": peerId,
}, nil)
event := Event{
Name: RemoteWorkerConnection,
PeerID: peerId,
WorkType: RemoteWorkerConnection,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking remote worker connection event: %s", err)
}
Expand All @@ -136,15 +124,15 @@ func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) {
// Parameters:
// - peerId: String containing the peer ID
// - protocol: The protocol used for the stream
//
// The event will contain the following data:
// - "peer_id": String containing the peer ID
// - "protocol": The protocol used for the stream
func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) {
err := a.TrackAndSendEvent(StreamCreation, map[string]interface{}{
"peer_id": peerId,
"protocol": protocol,
}, nil)
event := Event{
Name: StreamCreation,
PeerID: peerId,
WorkType: StreamCreation,
Error: protocol, // Assuming protocol is stored in Error field for now
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking stream creation event: %s", err)
}
Expand All @@ -153,17 +141,17 @@ func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) {
// TrackWorkRequestSerialization records when a work request is serialized for transmission.
//
// Parameters:
// - workType: The type of work being serialized
// - dataSize: The size of the serialized data
//
// The event will contain the following data:
// - "work_type": The WorkerType as a string
// - "data_size": The size of the serialized data
func (a *EventTracker) TrackWorkRequestSerialization(workType data_types.WorkerType, dataSize int) {
err := a.TrackAndSendEvent(WorkRequestSerialization, map[string]interface{}{
"work_type": workType,
"data_size": dataSize,
}, nil)
// - peerId: String containing the peer ID
func (a *EventTracker) TrackWorkRequestSerialization(dataSize int, peerId string) {
event := Event{
Name: WorkRequestSerialization,
PeerID: peerId,
WorkType: WorkRequestSerialization,
Error: fmt.Sprintf("data size: %d", dataSize), // Assuming data size is stored in Error field for now
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work request serialization event: %s", err)
}
Expand All @@ -172,17 +160,17 @@ func (a *EventTracker) TrackWorkRequestSerialization(workType data_types.WorkerT
// TrackWorkResponseDeserialization records when a work response is deserialized after reception.
//
// Parameters:
// - workType: The type of work being deserialized
// - success: Boolean indicating if the deserialization was successful
//
// The event will contain the following data:
// - "work_type": The WorkerType as a string
// - "success": Boolean indicating if the deserialization was successful
func (a *EventTracker) TrackWorkResponseDeserialization(workType data_types.WorkerType, success bool) {
err := a.TrackAndSendEvent(WorkResponseDeserialization, map[string]interface{}{
"work_type": workType,
"success": success,
}, nil)
// - peerId: String containing the peer ID
func (a *EventTracker) TrackWorkResponseDeserialization(success bool, peerId string) {
event := Event{
Name: WorkResponseDeserialization,
PeerID: peerId,
WorkType: WorkResponseDeserialization,
Success: success,
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking work response deserialization event: %s", err)
}
Expand All @@ -192,13 +180,16 @@ func (a *EventTracker) TrackWorkResponseDeserialization(workType data_types.Work
//
// Parameters:
// - reason: The reason for the fallback
//
// The event will contain the following data:
// - "reason": The reason for the fallback
func (a *EventTracker) TrackLocalWorkerFallback(reason string) {
err := a.TrackAndSendEvent(LocalWorkerFallback, map[string]interface{}{
"reason": reason,
}, nil)
// - peerId: String containing the peer ID
func (a *EventTracker) TrackLocalWorkerFallback(reason string, peerId string) {
event := Event{
Name: LocalWorkerFallback,
PeerID: peerId,
WorkType: LocalWorkerFallback,
Error: reason, // Assuming reason is stored in Error field for now
Timestamp: time.Now().UTC(),
}
err := a.TrackAndSendEvent(event, nil)
if err != nil {
logrus.Errorf("error tracking local worker fallback event: %s", err)
}
Expand Down
Loading

0 comments on commit fbbd1c6

Please sign in to comment.