From fbbd1c6525ddab936411a4800b9fbe5c33938bca Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Fri, 23 Aug 2024 14:10:43 -0700 Subject: [PATCH] Refactor event tracking to include peer ID and other details Updated event tracking functions to accept additional parameters such as peer ID. Simplified the event structure and ensured that all necessary information is captured and validated before sending events. --- pkg/event/event.go | 60 +++++++---- pkg/event/event_library.go | 193 ++++++++++++++++------------------ pkg/workers/worker_manager.go | 32 +++--- 3 files changed, 147 insertions(+), 138 deletions(-) diff --git a/pkg/event/event.go b/pkg/event/event.go index 19141750..08c7c847 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -1,6 +1,7 @@ package event import ( + "errors" "fmt" "sync" "time" @@ -22,9 +23,13 @@ const ( ) type Event struct { - Name string - Timestamp time.Time `json:"timestamp"` - Data map[string]interface{} + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + PeerID string `json:"peer_id"` + WorkType string `json:"work_type"` + RemoteWorker bool `json:"remote_worker"` + Success bool `json:"success"` + Error string `json:"error"` } type EventTracker struct { @@ -52,7 +57,7 @@ func NewEventTracker(config *Config) *EventTracker { } } -func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) { +func (a *EventTracker) TrackEvent(event Event) { if a == nil { return } @@ -60,16 +65,11 @@ func (a *EventTracker) TrackEvent(name string, data map[string]interface{}) { a.mu.Lock() defer a.mu.Unlock() - event := Event{ - Name: name, - Timestamp: time.Now().UTC(), - Data: data, - } - + event.Timestamp = time.Now().UTC() a.events = append(a.events, event) a.logger.WithFields(logrus.Fields{ - "event_name": name, - "data": data, + "event_name": event.Name, + "data": event, }).Info("Event tracked") } @@ -96,28 +96,46 @@ func (a *EventTracker) ClearEvents() { a.logger.Info("Events cleared") } -func (a *EventTracker) TrackAndSendEvent(name string, data map[string]interface{}, client *EventClient) error { +func (a *EventTracker) TrackAndSendEvent(event Event, client *EventClient) error { a.mu.Lock() defer a.mu.Unlock() - event := Event{ - Name: name, - Timestamp: time.Now().UTC(), - Data: data, - } - + event.Timestamp = time.Now().UTC() a.events = append(a.events, event) a.logger.WithFields(logrus.Fields{ - "event_name": name, - "data": data, + "event_name": event.Name, + "data": event, }).Info("Event tracked") if client != nil { return client.SendEvent(event) } else { if a.apiClient != nil { + err := validateEvent(event) + if err != nil { + return err + } return a.apiClient.SendEvent(event) } } return fmt.Errorf("no client available") } + +func validateEvent(event Event) error { + if event.Name == "" { + return errors.New("Event name is required") + } + if event.Timestamp.IsZero() { + return errors.New("Invalid timestamp") + } + if event.Timestamp.After(time.Now().UTC()) { + return errors.New("Timestamp cannot be in the future") + } + if event.PeerID == "" { + return errors.New("Peer ID is required") + } + if event.WorkType == "" { + return errors.New("Work type is required") + } + return nil +} diff --git a/pkg/event/event_library.go b/pkg/event/event_library.go index b6664751..a90be6a9 100644 --- a/pkg/event/event_library.go +++ b/pkg/event/event_library.go @@ -1,30 +1,26 @@ package event import ( + "fmt" "time" "github.com/sirupsen/logrus" - - data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) // TrackWorkDistribution records the distribution of work to a worker. // // Parameters: -// - workType: The type of work being distributed (e.g., Twitter, Web, Discord) // - remoteWorker: Boolean indicating if the work is sent to a remote worker (true) or executed locally (false) // - peerId: String containing the peer ID -// -// The event will contain the following data: -// - "peer_id": String containing the peer ID -// - "work_type": The WorkerType as a string -// - "remote_worker": Boolean indicating if it's a remote worker -func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, remoteWorker bool, peerId string) { - err := a.TrackAndSendEvent(WorkDistribution, map[string]interface{}{ - "peer_id": peerId, - "work_type": workType, - "remote_worker": remoteWorker, - }, nil) +func (a *EventTracker) TrackWorkDistribution(remoteWorker bool, peerId string) { + event := Event{ + Name: WorkDistribution, + PeerID: peerId, + WorkType: WorkDistribution, + RemoteWorker: remoteWorker, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work distribution event: %s", err) } @@ -33,20 +29,17 @@ func (a *EventTracker) TrackWorkDistribution(workType data_types.WorkerType, rem // TrackWorkCompletion records the completion of a work item. // // Parameters: -// - workType: The type of work that was completed // - success: Boolean indicating if the work was completed successfully // - peerId: String containing the peer ID -// -// The event will contain the following data: -// - "peer_id": String containing the peer ID -// - "work_type": The WorkerType as a string -// - "success": Boolean indicating if the work was successful -func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, success bool, peerId string) { - err := a.TrackAndSendEvent(WorkCompletion, map[string]interface{}{ - "peer_id": peerId, - "work_type": workType, - "success": success, - }, nil) +func (a *EventTracker) TrackWorkCompletion(success bool, peerId string) { + event := Event{ + Name: WorkCompletion, + PeerID: peerId, + WorkType: WorkCompletion, + Success: success, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work completion event: %s", err) } @@ -55,20 +48,17 @@ func (a *EventTracker) TrackWorkCompletion(workType data_types.WorkerType, succe // TrackWorkerFailure records a failure that occurred during work execution. // // Parameters: -// - workType: The type of work that failed // - errorMessage: A string describing the error that occurred // - peerId: String containing the peer ID -// -// The event will contain the following data: -// - "peer_id": String containing the peer ID -// - "work_type": The WorkerType as a string -// - "error": String containing the error message -func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorMessage string, peerId string) { - err := a.TrackAndSendEvent(WorkFailure, map[string]interface{}{ - "peer_id": peerId, - "work_type": workType, - "error": errorMessage, - }, nil) +func (a *EventTracker) TrackWorkerFailure(errorMessage string, peerId string) { + event := Event{ + Name: WorkFailure, + PeerID: peerId, + WorkType: WorkFailure, + Error: errorMessage, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking worker failure event: %s", err) } @@ -77,20 +67,17 @@ func (a *EventTracker) TrackWorkerFailure(workType data_types.WorkerType, errorM // TrackWorkExecutionStart records the start of work execution. // // Parameters: -// - workType: The type of work being executed // - remoteWorker: Boolean indicating if the work is executed by a remote worker (true) or locally (false) // - peerId: String containing the peer ID -// -// The event will contain the following data: -// - "work_type": The WorkerType as a string -// - "remote_worker": Boolean indicating if it's a remote worker -// - "peer_id": String containing the peer ID -func (a *EventTracker) TrackWorkExecutionStart(workType data_types.WorkerType, remoteWorker bool, peerId string) { - err := a.TrackAndSendEvent(WorkExecutionStart, map[string]interface{}{ - "work_type": workType, - "remote_worker": remoteWorker, - "peer_id": peerId, - }, nil) +func (a *EventTracker) TrackWorkExecutionStart(remoteWorker bool, peerId string) { + event := Event{ + Name: WorkExecutionStart, + PeerID: peerId, + WorkType: WorkExecutionStart, + RemoteWorker: remoteWorker, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work execution start event: %s", err) } @@ -99,17 +86,17 @@ func (a *EventTracker) TrackWorkExecutionStart(workType data_types.WorkerType, r // TrackWorkExecutionTimeout records when work execution times out. // // Parameters: -// - workType: The type of work that timed out // - timeoutDuration: The duration of the timeout -// -// The event will contain the following data: -// - "work_type": The WorkerType as a string -// - "timeout_duration": The duration of the timeout -func (a *EventTracker) TrackWorkExecutionTimeout(workType data_types.WorkerType, timeoutDuration time.Duration) { - err := a.TrackAndSendEvent(WorkExecutionTimeout, map[string]interface{}{ - "work_type": workType, - "timeout_duration": timeoutDuration, - }, nil) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkExecutionTimeout(timeoutDuration time.Duration, peerId string) { + event := Event{ + Name: WorkExecutionTimeout, + PeerID: peerId, + WorkType: WorkExecutionTimeout, + Error: fmt.Sprintf("timeout after %s", timeoutDuration), + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work execution timeout event: %s", err) } @@ -119,13 +106,14 @@ func (a *EventTracker) TrackWorkExecutionTimeout(workType data_types.WorkerType, // // Parameters: // - peerId: String containing the peer ID -// -// The event will contain the following data: -// - "peer_id": String containing the peer ID func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) { - err := a.TrackAndSendEvent(RemoteWorkerConnection, map[string]interface{}{ - "peer_id": peerId, - }, nil) + event := Event{ + Name: RemoteWorkerConnection, + PeerID: peerId, + WorkType: RemoteWorkerConnection, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking remote worker connection event: %s", err) } @@ -136,15 +124,15 @@ func (a *EventTracker) TrackRemoteWorkerConnection(peerId string) { // Parameters: // - peerId: String containing the peer ID // - protocol: The protocol used for the stream -// -// The event will contain the following data: -// - "peer_id": String containing the peer ID -// - "protocol": The protocol used for the stream func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) { - err := a.TrackAndSendEvent(StreamCreation, map[string]interface{}{ - "peer_id": peerId, - "protocol": protocol, - }, nil) + event := Event{ + Name: StreamCreation, + PeerID: peerId, + WorkType: StreamCreation, + Error: protocol, // Assuming protocol is stored in Error field for now + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking stream creation event: %s", err) } @@ -153,17 +141,17 @@ func (a *EventTracker) TrackStreamCreation(peerId string, protocol string) { // TrackWorkRequestSerialization records when a work request is serialized for transmission. // // Parameters: -// - workType: The type of work being serialized // - dataSize: The size of the serialized data -// -// The event will contain the following data: -// - "work_type": The WorkerType as a string -// - "data_size": The size of the serialized data -func (a *EventTracker) TrackWorkRequestSerialization(workType data_types.WorkerType, dataSize int) { - err := a.TrackAndSendEvent(WorkRequestSerialization, map[string]interface{}{ - "work_type": workType, - "data_size": dataSize, - }, nil) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkRequestSerialization(dataSize int, peerId string) { + event := Event{ + Name: WorkRequestSerialization, + PeerID: peerId, + WorkType: WorkRequestSerialization, + Error: fmt.Sprintf("data size: %d", dataSize), // Assuming data size is stored in Error field for now + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work request serialization event: %s", err) } @@ -172,17 +160,17 @@ func (a *EventTracker) TrackWorkRequestSerialization(workType data_types.WorkerT // TrackWorkResponseDeserialization records when a work response is deserialized after reception. // // Parameters: -// - workType: The type of work being deserialized // - success: Boolean indicating if the deserialization was successful -// -// The event will contain the following data: -// - "work_type": The WorkerType as a string -// - "success": Boolean indicating if the deserialization was successful -func (a *EventTracker) TrackWorkResponseDeserialization(workType data_types.WorkerType, success bool) { - err := a.TrackAndSendEvent(WorkResponseDeserialization, map[string]interface{}{ - "work_type": workType, - "success": success, - }, nil) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackWorkResponseDeserialization(success bool, peerId string) { + event := Event{ + Name: WorkResponseDeserialization, + PeerID: peerId, + WorkType: WorkResponseDeserialization, + Success: success, + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking work response deserialization event: %s", err) } @@ -192,13 +180,16 @@ func (a *EventTracker) TrackWorkResponseDeserialization(workType data_types.Work // // Parameters: // - reason: The reason for the fallback -// -// The event will contain the following data: -// - "reason": The reason for the fallback -func (a *EventTracker) TrackLocalWorkerFallback(reason string) { - err := a.TrackAndSendEvent(LocalWorkerFallback, map[string]interface{}{ - "reason": reason, - }, nil) +// - peerId: String containing the peer ID +func (a *EventTracker) TrackLocalWorkerFallback(reason string, peerId string) { + event := Event{ + Name: LocalWorkerFallback, + PeerID: peerId, + WorkType: LocalWorkerFallback, + Error: reason, // Assuming reason is stored in Error field for now + Timestamp: time.Now().UTC(), + } + err := a.TrackAndSendEvent(event, nil) if err != nil { logrus.Errorf("error tracking local worker fallback event: %s", err) } diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index e50be20d..9454cdd9 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -114,12 +114,12 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers) response = whm.sendWorkToWorker(node, worker, workRequest) if response.Error != "" { - whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) logrus.Errorf("error sending work to worker: %s: %s", response.WorkerPeerId, response.Error) logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId) continue } - whm.eventTracker.TrackWorkCompletion(workRequest.WorkType, response.Error == "", worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkCompletion(response.Error == "", worker.AddrInfo.ID.String()) return response } // Fallback to local execution if local worker is eligible @@ -130,9 +130,9 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest } else { reason = "no remote workers available" } - whm.eventTracker.TrackLocalWorkerFallback(reason) - whm.eventTracker.TrackWorkExecutionStart(workRequest.WorkType, false, localWorker.AddrInfo.ID.String()) - return whm.ExecuteWork(workRequest) + whm.eventTracker.TrackLocalWorkerFallback(reason, localWorker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkExecutionStart(false, localWorker.AddrInfo.ID.String()) + return whm.ExecuteWork(workRequest, localWorker.AddrInfo.ID.String()) } if response.Error == "" { response.Error = "no eligible workers found" @@ -148,7 +148,7 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da if err := node.Host.Connect(ctxWithTimeout, *worker.AddrInfo); err != nil { response.Error = fmt.Sprintf("failed to connect to remote peer %s: %v", worker.AddrInfo.ID.String(), err) - whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } else { whm.eventTracker.TrackRemoteWorkerConnection(worker.AddrInfo.ID.String()) @@ -157,7 +157,7 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da stream, err := node.Host.NewStream(ctxWithTimeout, worker.AddrInfo.ID, protocol) if err != nil { response.Error = fmt.Sprintf("error opening stream: %v", err) - whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } // the stream should be closed by the receiver, but keeping this here just in case @@ -182,20 +182,20 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da response.Error = fmt.Sprintf("error writing length to stream: %v", err) return } - whm.eventTracker.TrackWorkRequestSerialization(workRequest.WorkType, len(bytes)) + whm.eventTracker.TrackWorkRequestSerialization(len(bytes), worker.AddrInfo.ID.String()) _, err = stream.Write(bytes) if err != nil { response.Error = fmt.Sprintf("error writing to stream: %v", err) - whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } - whm.eventTracker.TrackWorkDistribution(workRequest.WorkType, true, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkDistribution(true, worker.AddrInfo.ID.String()) // Read the response length lengthBuf = make([]byte, 4) _, err = io.ReadFull(stream, lengthBuf) if err != nil { response.Error = fmt.Sprintf("error reading response length: %v", err) - whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) + whm.eventTracker.TrackWorkerFailure(response.Error, worker.AddrInfo.ID.String()) return } responseLength := binary.BigEndian.Uint32(lengthBuf) @@ -212,14 +212,14 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker da response.Error = fmt.Sprintf("error unmarshaling response: %v", err) return } - whm.eventTracker.TrackWorkResponseDeserialization(workRequest.WorkType, true) + whm.eventTracker.TrackWorkResponseDeserialization(true, worker.AddrInfo.ID.String()) } return response } // ExecuteWork finds and executes the work handler associated with the given name. // It tracks the call count and execution duration for the handler. -func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse) { +func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest, peerId string) (response data_types.WorkResponse) { handler, exists := whm.getWorkHandler(workRequest.WorkType) if !exists { return data_types.WorkResponse{Error: ErrHandlerNotFound.Error()} @@ -250,7 +250,7 @@ func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) ( select { case <-ctx.Done(): // Context timed out - whm.eventTracker.TrackWorkExecutionTimeout(workRequest.WorkType, workerConfig.WorkerResponseTimeout) + whm.eventTracker.TrackWorkExecutionTimeout(workerConfig.WorkerResponseTimeout, peerId) return data_types.WorkResponse{Error: "work execution timed out"} case response = <-responseChan: @@ -291,8 +291,8 @@ func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream) { return } peerId := stream.Conn().LocalPeer().String() - whm.eventTracker.TrackWorkExecutionStart(workRequest.WorkType, true, peerId) - workResponse := whm.ExecuteWork(workRequest) + whm.eventTracker.TrackWorkExecutionStart(true, peerId) + workResponse := whm.ExecuteWork(workRequest, peerId) if workResponse.Error != "" { logrus.Errorf("error from remote worker %s: executing work: %s", peerId, workResponse.Error) }