From f9ce8a78311d9935308a0411e0df7511a73dea2e Mon Sep 17 00:00:00 2001 From: Brian Ginsburg <7957636+bgins@users.noreply.github.com> Date: Thu, 31 Oct 2024 19:17:17 -0700 Subject: [PATCH] feat: Add job creator CLI run trace (#418) * feat: Add job creator run trace * chore: Remove commented code --- pkg/jobcreator/run.go | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index db762d57..39e70ae9 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -1,12 +1,15 @@ package jobcreator import ( + "errors" "fmt" "time" "github.com/lilypad-tech/lilypad/pkg/data" "github.com/lilypad-tech/lilypad/pkg/system" "github.com/lilypad-tech/lilypad/pkg/web3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) @@ -46,20 +49,42 @@ func RunJob( // wait a short period because we've just started the job creator service time.Sleep(100 * time.Millisecond) + // Start run job trace + c, span := jobCreatorService.controller.tracer.Start(ctx.Ctx, "run_job", + trace.WithAttributes( + attribute.String("job_offer.job_creator", offer.JobCreator), + attribute.String("job_offer.module.repo", offer.Module.Repo), + attribute.String("job_offer.module.hash", offer.Module.Hash), + attribute.String("job_offer.mode", string(offer.Mode)), + )) + ctx.Ctx = c + defer span.End() + + span.AddEvent("add_job_offer.start") jobOfferContainer, err := jobCreatorService.AddJobOffer(offer) if err != nil { jobCreatorService.controller.log.Error("failed to add job offer", err) + span.SetStatus(codes.Error, "failed to add job offer") + span.RecordError(err) return nil, err } jobCreatorService.controller.log.Debug("job offer ID", jobOfferContainer.ID) + span.AddEvent("add_job_offer.done", + trace.WithAttributes( + attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID), + attribute.String("job_offer_container.state", data.GetAgreementStateString(jobOfferContainer.State)), + )) + span.SetAttributes(attribute.String("job_offer.id", jobOfferContainer.JobOffer.ID), + attribute.String("deal.id", jobOfferContainer.DealID)) updateChan := make(chan data.JobOfferContainer) jobCreatorService.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) { - // spew.Dump(evOffer) if evOffer.JobOffer.ID != jobOfferContainer.ID { return } + span.AddEvent("job_offer_update", + trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State)))) updateChan <- evOffer }) @@ -71,9 +96,14 @@ waitloop: select { // this means the job was cancelled case err := <-jobCreatorErrors: + span.SetStatus(codes.Error, "job cancelled") + span.RecordError(err) return nil, err case <-ctx.Ctx.Done(): - return nil, fmt.Errorf("job cancelled") + err = errors.New("job cancelled by closed context") + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + return nil, err case finalJobOffer = <-updateChan: if data.IsTerminalAgreementState(finalJobOffer.State) { break waitloop @@ -83,13 +113,17 @@ waitloop: // Check if our job was cancelled if finalJobOffer.State == data.GetAgreementStateIndex("JobOfferCancelled") { + span.SetStatus(codes.Error, "job cancelled") + span.RecordError(err) return nil, fmt.Errorf("job was cancelled") } + span.AddEvent("get_result.start") result, err := jobCreatorService.GetResult(finalJobOffer.DealID) if err != nil { return nil, err } + span.AddEvent("get_result.done", trace.WithAttributes(attribute.String("result.deal_id", result.DealID))) return &RunJobResults{ JobOffer: finalJobOffer,